Search code examples
rabbitmqspring-rabbit

How to I force single consumer on annotation drive rabbitmq spring application


I'm scratching my head with something here. I need to enable an app to have a single consumer for a queue. So my first gut reaction was to do this:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        return factory;
    }

As I sent some messages to the queue for testing, I did notice that only one listener was active at a time, but I noticed this in the logs too:

2024-02-29T16:50:54.338-05:00 DEBUG 49772 --- [pool-2-thread-9] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '38' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=30
2024-02-29T16:50:54.466-05:00 DEBUG 49772 --- [ool-2-thread-10] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '39' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=31

And while looking at the Rabbit console I also noticed that messages were in unack state, but not ready.

So I guess the BlockingQueueConsumer does remove them from the queue, and controls concurrency internally. Is there a way to actually force just one message being pulled at a time using Annotations? Or do I have to switch to a manual poll to achieve that?

Thanks folks


Solution

  • For that purpose you must not use factory.setMaxConcurrentConsumers(1);. Leave it as-is. There is the logic in the AsyncMessageProcessingConsumer.mainLoop():

    boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
    if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
        checkAdjust(receivedOk);
    }
    

    So, we really do try to start a new BlockingQueueConsumer if that property is not null.

    You can find much more info in JavaDocs of the concurrency props in the SimpleMessageListenerContainer.

    There is also this one which might be some kind of interest for your use-case:

    /**
     * Set to true for an exclusive consumer - if true, the concurrency must be 1.
     * @param exclusive true for an exclusive consumer.
     */
    @Override
    public final void setExclusive(boolean exclusive) {
    

    There is also some info in docs: https://docs.spring.io/spring-amqp/reference/amqp/listener-concurrency.html

    You might also be interested in this option to be set to 1:

    /**
     * Tell the broker how many messages to send to each consumer in a single request.
     * Often this can be set quite high to improve throughput.
     * @param prefetchCount the prefetch count
     * @see com.rabbitmq.client.Channel#basicQos(int, boolean)
     */
    public void setPrefetchCount(int prefetchCount) {