Search code examples
pythonrabbitmqqueueconfirmationpika

What is the requeue order in RabbitMQ?


According to the RabbitMQ documentation on Consumer Acknowledgements:

When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.

So with a single client consumer, if the server queue is initially

tail [c b a] head

and the client consumer consumes the head message ("a"), the server queue should become:

tail [c b] head

Then if the client consumer nacks the processed message, the message should get requeued in the server queue at the head (its "original position" as per the documentation) and the server queue should become:

tail [c b a] head

Finally the client consumer should consume again the same head message ("a").

But that is not what I observed using the Python library Pika. What I observed is that nacked messages gets requeued at the tail of the server queue, not at the head (the "original position"). Is the RabbitMQ documentation correct or is the library Pika correct?

Sample code:

import logging

import pika

logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
parameters = pika.ConnectionParameters()

# Produce messages

with pika.BlockingConnection(parameters) as connection:
    queue = "foobar"
    routing_key = queue
    channel = connection.channel()
    channel.queue_declare(queue=queue)
    for body in ["a", "b", "c"]:
        channel.publish(exchange="", routing_key=routing_key, body=body)
        logging.info(
            "Produced message %r with routing key %r", body, routing_key
        )

# Consume messages

def handle(channel, method, properties, body):
    logging.info("Consumed message %r from queue %r", body.decode(), queue)
    channel.basic_nack(method.delivery_tag)

with pika.BlockingConnection(parameters) as connection:
    queue = "foobar"
    channel = connection.channel()
    channel.queue_declare(queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=handle)
    channel.start_consuming()

Output:

INFO:root:Produced message 'a' with routing key 'foobar'
INFO:root:Produced message 'b' with routing key 'foobar'
INFO:root:Produced message 'c' with routing key 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'
INFO:root:Consumed message 'a' from queue 'foobar'
INFO:root:Consumed message 'b' from queue 'foobar'
INFO:root:Consumed message 'c' from queue 'foobar'


Solution

  • The behavior you encountered is most likely due to the prefetch behavior.

    As you have not specified the desired quality of service, I believe (would appreciate a more knowledgeable source to confirm this point?) that the prefetch is decided by the server, and would likely tend to be pretty high.

    The idea is that for performance matters, a client can get multiple messages, which would be favorable in most cases:

    • if there is multithreading on consumer side, he can probably accomodate multiple messages processing in parallel, so would have multiple messages not yet acked a any given time
    • to allow more fluid processing in the "happy" cases, client can ack a block of messages, letting the server know that up to given message, all message received by the consumer are acked, it reduces the overhead when we have the cases of large amount of messages which require little processing

    If you check the documentation links below, they explain how you can control the behavior.

    Additional information regarding those points are available at: