I'm scratching my head with something here. I need to enable an app to have a single consumer for a queue. So my first gut reaction was to do this:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
As I sent some messages to the queue for testing, I did notice that only one listener was active at a time, but I noticed this in the logs too:
2024-02-29T16:50:54.338-05:00 DEBUG 49772 --- [pool-2-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '38' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.16.17.220:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://guest@172.16.17.220:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=30
2024-02-29T16:50:54.466-05:00 DEBUG 49772 --- [ool-2-thread-10] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '39' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@172.16.17.220:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://guest@172.16.17.220:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=31
And while looking at the Rabbit console I also noticed that messages were in unack state, but not ready.
So I guess the BlockingQueueConsumer
does remove them from the queue, and controls concurrency internally. Is there a way to actually force just one message being pulled at a time using Annotations? Or do I have to switch to a manual poll to achieve that?
Thanks folks
For that purpose you must not use factory.setMaxConcurrentConsumers(1);
.
Leave it as-is. There is the logic in the AsyncMessageProcessingConsumer.mainLoop()
:
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
So, we really do try to start a new BlockingQueueConsumer
if that property is not null
.
You can find much more info in JavaDocs of the concurrency
props in the SimpleMessageListenerContainer
.
There is also this one which might be some kind of interest for your use-case:
/**
* Set to true for an exclusive consumer - if true, the concurrency must be 1.
* @param exclusive true for an exclusive consumer.
*/
@Override
public final void setExclusive(boolean exclusive) {
There is also some info in docs: https://docs.spring.io/spring-amqp/reference/amqp/listener-concurrency.html
You might also be interested in this option to be set to 1
:
/**
* Tell the broker how many messages to send to each consumer in a single request.
* Often this can be set quite high to improve throughput.
* @param prefetchCount the prefetch count
* @see com.rabbitmq.client.Channel#basicQos(int, boolean)
*/
public void setPrefetchCount(int prefetchCount) {