I need to know if there are more messages comming for this consumer.
Right now I count the messages on the queue. But that give me only what is left on the queue and not what has been prefetched.
@RabbitListener(queues = QUEUENAME)
public void recieve(Message message, Channel channel) throws IOException {
long messagesOnQueue = channel.messageCount(QUEUENAME);
if(messagesOnQueue>1) {
//add message to list
}
else {
//save the list
}
}
It would be really great If there was a way to tell if messages was prefetched for this consumer. Is that possible? If I can get that count then I dont care if there are messages on the queue as well.
After recieving suggestions from Gary I have changed the implementation to this, and it works. When manually acknowledging a message it has to be done on the same channel as you get the message. But you can save a reference to it in case you need it in another thread. In your spring boot application.yml add this
spring:
rabbitmq:
listener:
direct:
prefetch: 200
simple:
prefetch: 200
acknowledgeMode: MANUAL
Code from the consumer.
//The list we build and save in one transaction
private Set<PayloadDto> unhandledPayloads = new HashSet<>();
private long latestTag = 0L;
private Channel latestChannel;
@RabbitListener(queues = QUEUE_NAME, id = "consumerId")
public void recieve(Message message, Channel channel) throws IOException {
PayloadDto payloadDto = parse(message.getBody());
unhandledPayloads.add(payloadDto);
latestTag = message.getMessageProperties().getDeliveryTag();
latestChannel = channel;
if (unhandledPayloads.size() > UNHANDLED_PAYLOADS_LIMIT) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
channel.basicAck(latestTag, true);
}
}
@EventListener(condition = "event.listenerId == 'consumerId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
if(!queue.isEmpty()) {
service.createOrUpdate(unhandledPayloads);
queue.clear();
latestChannel.basicAck(latestTag, true);
}
}
The reason we are trying to build up a list before saving it is to be able to do batch insert to make it run faster.
Not currently, but it wouldn't be hard to add a feature. Open a github issue to request it. However, I am not sure how useful it would be. If there are still messages in the queue, consuming a prefetched will fetch another.