Search code examples
rabbitmqspring-amqpspring-rabbit

Manually ack messages in RabbitMQ


Previously I was reading all the messages present in the queue, but now I have to return specific amount of message based of users choice(count).

I try to change the for loop accordingly but its reading all the message because of auto acknowledge. So I tried changing it to manual in config file.

In my program how to ack message manually after reading msg(currently i am using AmqpTemplate to receive and i don't have reference of channel)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        }
}

Any help is highly appreciable, Thanks in advance.


Solution

  • You cannot manually ack with the receive() method - use a SimpleMessageListenerContainer for an event-driven consumer with MANUAL acks and a ChannelAwareMessageListener. Or, use the template's execute() method which gives you access to the Channel - but then you will be using the lower-level RabbitMQ API, not the Message abstraction.

    EDIT:

    You need to learn the underlying RabbitMQ Java API to use execute, but something like this will work...

        final int messageCount = 3;
        boolean result = template.execute(new ChannelCallback<Boolean>() {
    
            @Override
            public Boolean doInRabbit(final Channel channel) throws Exception {
                int n = messageCount;
                channel.basicQos(messageCount); // prefetch
                long deliveryTag = 0;
                while (n > 0) {
                    GetResponse result = channel.basicGet("si.test.queue", false);
                    if (result != null) {
                        System.out.println(new String(result.getBody()));
                        deliveryTag = result.getEnvelope().getDeliveryTag();
                        n--;
                    }
                    else {
                        Thread.sleep(1000);
                    }
                }
                if (deliveryTag > 0) {
                    channel.basicAck(deliveryTag, true);
                }
                return true;
            }
        });