Search code examples
spring-amqp

Can I in a @RabbitConsumer find out if any messages are prefetched for this consumer


I need to know if there are more messages comming for this consumer.

Right now I count the messages on the queue. But that give me only what is left on the queue and not what has been prefetched.

@RabbitListener(queues = QUEUENAME)
    public void recieve(Message message, Channel channel) throws IOException {
   long messagesOnQueue = channel.messageCount(QUEUENAME);
   if(messagesOnQueue>1) {
     //add message to list
   }
   else {
      //save the list
   }
}

It would be really great If there was a way to tell if messages was prefetched for this consumer. Is that possible? If I can get that count then I dont care if there are messages on the queue as well.

After recieving suggestions from Gary I have changed the implementation to this, and it works. When manually acknowledging a message it has to be done on the same channel as you get the message. But you can save a reference to it in case you need it in another thread. In your spring boot application.yml add this

spring:
  rabbitmq:
    listener:
      direct:
        prefetch: 200
      simple:
        prefetch: 200
        acknowledgeMode: MANUAL

Code from the consumer.

//The list we build and save in one transaction
    private Set<PayloadDto> unhandledPayloads = new HashSet<>();
    private long latestTag = 0L;
    private Channel latestChannel;

    @RabbitListener(queues = QUEUE_NAME, id = "consumerId")
    public void recieve(Message message, Channel channel) throws IOException {
        PayloadDto payloadDto = parse(message.getBody());

        unhandledPayloads.add(payloadDto);
        latestTag = message.getMessageProperties().getDeliveryTag();
        latestChannel = channel;
        if (unhandledPayloads.size() > UNHANDLED_PAYLOADS_LIMIT) {
            service.createOrUpdate(unhandledPayloads);
            queue.clear();
            channel.basicAck(latestTag, true);
        }
    }


    @EventListener(condition = "event.listenerId == 'consumerId'")
    public void onApplicationEvent(ListenerContainerIdleEvent event) {
        if(!queue.isEmpty()) {
            service.createOrUpdate(unhandledPayloads);
            queue.clear();
            latestChannel.basicAck(latestTag, true);
        }

    }

The reason we are trying to build up a list before saving it is to be able to do batch insert to make it run faster.


Solution

  • Not currently, but it wouldn't be hard to add a feature. Open a github issue to request it. However, I am not sure how useful it would be. If there are still messages in the queue, consuming a prefetched will fetch another.