Search code examples
rabbitmqprefetchconsumer

RabbitMQ prefetch ignored when consumer is Down and gets Up


I'm getting my basicQos ignored when consumer is down and, after, consumer gets up. For instance, suppose that consumer is down and 5 messages arrives from a producer. If consumer is not running, these messages will be stored in disk (I think!) if exchanger/queue is (are) durable.

if I set basicQos as channel.basicQos(0, 3, true), my consumer receives more than 3 messages when it gets UP. Why?!?

On the other hand, everything works properly (only 3 messages are read from the queue) if consumer is running when it receives messages from the queues... My code is as follows:

factory = new ConnectionFactory();
factory.setHost(mRabbitMQHost); //may get server address from file configuration.
factory.setUsername(mRabbitMQUsername); 
factory.setPassword(mRabbitMQPassword);
connection = factory.newConnection();
channel = connection.createChannel();

channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);

for (QGQueues queue : QGQueues.values()) {
    String queueName = queue.getQueueName();
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
    channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server    
}

channel.basicQos(0, 3, true);

Thanks!


Solution

  • My bet would be that you need to set the QoS before you do anything else.

    Change your code to this order:

    
    
    channel = connection.createChannel();
    
    // set QoS immediately
    channel.basicQos(0, 3, true);
    
    channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
    consumer = new QueueingConsumer(channel);
    
    for (QGQueues queue : QGQueues.values()) {
        String queueName = queue.getQueueName();
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
        channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server    
    }
    

    this will ensure the prefetch limit is set before you try to consume any messages.