Search code examples
javaspring-bootrabbitmqspring-amqpspring-rabbit

Consuming a queue based on its consumer count in Spring AMQP


I want a queue to be consumed by only one subscriber at a time. So if one subscriber drops, then another one(s) will have the chance of subscribing.

I am looking for the correct way of doing it in Spring AMQP. I did this in pure Java, based on the example in RabbitMQ's website. I passively declare the queue, check its consumer count, if it is 0, then start to consume it.

Here's the code.

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

int count = channel.queueDeclarePassive(QUEUE_NAME).getConsumerCount();

System.out.println("count is "+count);
if (count == 0) {
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} else{
    System.out.println("subscribed by some other processor(s)");
}

I also can check the subscriber count in Spring AMQP this way. But it is too late, because it already listens to the queue.

@RabbitListener(queues = "q1")
public void receivedMessageQ1(String message, Channel channel){
    try {
        int q1 = channel.queueDeclarePassive("q1").getConsumerCount();
        // do something.
    } catch (IOException e) {
        System.out.println("exception occurred");
    }
}

In a nutshell, I want to consume a queue based on its consumer count. I hope I am clear.


Solution

  • Set the exclusive flag on the @RabbitListener; RabbitMQ will only allow one instance to consume. The other instance(s) will attempt to listen every 5 seconds (by default). To increase the interval, set the container factory's recoveryBackOff.

    @SpringBootApplication
    public class So56319999Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56319999Application.class, args);
        }
    
        @RabbitListener(queues = "so56319999", exclusive = true)
        public void listen (String in) {
    
        }
    
        @Bean
        public Queue queue() {
            return new Queue("so56319999");
        }
    
    }