Suppose I have a queue with five items:
(tail) E, D, C, B, A (head)
I consume messages from the head of this queue, but decide that message A
is not suitable for processing at the present time. I reject
that item with requeue=True
, and the queue becomes:
(tail) A, E, D, C, B (head)
I then consume B
, C
, D
, and E
, ack
ing each one. Now the queue holds only A
, which I continually consume and reject
over and over in a never-ending loop. If a new, non-A
message comes in, it gets consumed almost immediately, then the process resumes its loop of trying to consume A
.
I do this with a slight modification to the Twisted Consumer Example from the Pika docs:
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task
@defer.inlineCallbacks
def run(connection):
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')
queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')
#yield channel.basic_qos(prefetch_count=1)
queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)
l = task.LoopingCall(read, queue_object)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object):
ch,method,properties,body = yield queue_object.get()
print body
if body == 'A':
yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
yield ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()
The Problem: Note the following commented-out line:
#yield channel.basic_qos(prefetch_count=1)
When I uncomment that, and the consumer reaches message A
, it will immediately pick it up again after reject
ing it, ignoring any other items that may be waiting in the queue behind it. Instead of placing the rejected item onto the tail of the queue, it just keeps trying it again, over and over, completely blocking everything else in the queue.
With the line commented out, it works properly (albeit a bit more slowly). If the line is present and prefetch_count > 1
, it also works. Something about setting it to exactly 1
triggers this behavior.
Is there a step I'm missing in rejecting message A
? Or is Pika's prefetching system fundamentally incompatible with this edge case?
If you have only one consumer then RabbitMQ has no other way than send message to the same consumer it was rejected from (no matter how: with basic.reject or basic.nack).
When you set prefetch_count > 1
then your consumer will have your looped message plus new one from the head next to looped (literally, your looped message will stay at head).
If you get accidentally N*M
looped messages withprefetch_count <= N
and consumers number <= M
you will have all messages being looped (which leads to burned down CPU and so on), so it might be a good catch to check rejected
message flag and have some advanced logic if message was already redelivered.