Search code examples
rabbitmqamqppika

Queue max-length or TTL work with get but not consume


I see many posts asking about limiting queue length. In my experiment with Pika and RabbitMQ, if I declare the queue with arguments={'x-message-ttl': 1000, 'x-max-length': 2, 'x-overflow': 'drop-head'} and even add expiration='1000' to the message properties when it is produced, I can see that all three contribute individually to dropping messages from the queue. My goal is to make sure that the consumer only receives the most recent information.

But, as pointed out here: RabbitMQ messages delivered after x-message-ttl has expired, I can only get it to work using basic_get instead of basic_consume.

basic_get seems to pull messages, sending a request each time. I need to be able wait for the server to push messages, instead of polling it. Isn't a consumer the right way to go? What are the requirements for a consumer to take advantage of x-message-ttl or x-max-length (I tried basic_qos(prefetch_count=1))?


Solution

  • The problem is that autoAck is set to True in the code you linked. If you set autoAck to True, the moment the message is in your buffer, it is seen as received, and RabbitMQ will send you new message.

    This basically means that all your Thread.sleep(300) does is delay how long until the message is displayed. The messages are still being received and put into the buffer, and even worse, if your application crashes while there are still messages in the buffer, all the messages in the buffer are lost.

    If you on the other-hand turned off autoAck, and you had prefetch_count set to 1. RabbitMQ wouldn't send the consumer a new message until the first message is marked as Acknowledged.

    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(Thread.currentThread().toString() + " - " + new String(body));
                try {
                    Thread.sleep(300);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    

    This is a lot slower, but it's also a lot safer, and will ensure that you only receive the messages you can handle.