Search code examples
javarabbitmqamqprabbitmq-exchange

RabbitMQ DefaultConsumer causing too many consumer tags


I have a RabbitMQ client application that listens to a specific queue. The client creates and instance of DefaultConsumer and implements the handleDelivery method. Here is the code

    protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    public void receiveMessages() {
        try {
//            channel.basicQos(pollCount);
            Message message = new Message();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String response = new String(body, "UTF-8");
                    if (response != null) {
                        message.setId(NUID.nextGlobal());
                        message.setPayload(response);
                        message.setDeliveryTag(deliveryTag);
                        messages.add(message);
                        logger.info("Message received: ", message.getPayload());
                    }
                }
            };
            logger.debug("**********Channel status: " + channel.isOpen());
            channel.basicConsume(queueName, false, consumer);
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);

        }
    }

The method receiveMessages() is called frequently through a thread every 500ms and drains the messages into a different List for consumption. Due to this poll on receiveMessages() I observed that the consumer tags are continuously getting created and growing when viewed through rabbit console like in the picture. Is it normal to see those increasing consumer tags? enter image description here


Solution

  • I finally found a working solution. As Luke Bakken highlighted there is no polling required. I just call receiveMesssages() only once now. Thereafter my consumer is receiving callbacks as the messages are published into the queue.

     protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
     public void receiveMessages() {
        try {
            Message message = new Message();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                String response = new String(delivery.getBody(), "UTF-8");
                if (response != null) {
                    message.setId(NUID.nextGlobal());
                    message.setPayload(response);
                    message.setDeliveryTag(deliveryTag);
                    messages.add(message);
                    logger.info("Message received: ", message.getPayload());
                };
            channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
        } catch (Exception e) {
            logger.error("Exception while getting messages from Rabbit ", e);
        }
    }
    

    The rabbit console now shows only 1 consume tag entry under the bound queue.