I have a problem with the RabbitMQ implementation PIKA in Python. I want to consume 1 message from a queue, work with it and acknowledge it when the work is done. Then the next message should be received.
I used the prefetch_count = 1 option, to tell rabbitMQ that this consumer only wants 1 message at a time and don't want a new message until this message is acknowledged.
Here is my (very simple) code:
credentials = pika.PlainCredentials("username","password")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='1.2.3.4', credentials=credentials))
channel = connection.channel()
def consume(ch, method, properties, body):
time.sleep(5) # Here is the work, now just hold 5 seconds
ch.basic_ack(method.delivery_tag)
def init():
channel.basic_consume(
queue="raw.archive", on_message_callback=consume, auto_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
if __name__ == "__main__":
init()
So my question is, why does rabbitmq deliver more documents (40/sec) than acknowledged (0.20/sec, correct, because of 5 seconds pause). Shouldn't these two be equal? Furthermore the Unacked value (1650) should never be greater than 1, because it should not deliver any document, until this document gets acknowleged.
The second view shows, that the consumer has no prefetch count. But the prefetch count is set on the connection. Maybe I must set it to the consumer, but I don't know, how to set this.
What am I doing wrong?
Thanks in advance.
As confirmed by Marcel,
Issue is related to when the basic_qos is set on the channel. It seems it should be set prior to the basic_consume.
def init():
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue="raw.archive", on_message_callback=consume, auto_ack=False)
channel.start_consuming()