Search code examples
pythonrabbitmqpython-asynciokombu

Acknowledged kombu message is not removed from RabbitMQ queue


I am trying to create a Python script that waits for a message on a RabbitMQ queue to start a task in a subprocess. During task execution, the script continues to consume another queue for a cancel order that would stop the task.

I use kombu package to handle interactions with RabbitMQ. I call message.ack() when the task terminates (whether normally or because of cancellation).

Despite the call to message.ack(), the start message is not removed from the queue (I can tell by using rabbitmqctl). This causes the message to be redelivered even if the task ran to completion.

I created a sample repository to show the problem. The README file shows the reproduction steps.

I do not know where the problem could be. I realize there are many moving parts but this is a slightly simplified version of a real project with its own constraints (like the Python version being fixed to 3.8). I am open to any suggestion to make a better use of kombu or asyncio because I am quite new to AMQP and async Python.


Solution

  • I finally found the cause of my problem. Actually there were two causes:

    1. I did not set channels when creating the Consumers, so they were both using the same channel. Since I had set noAck=True for the "stop" consumer, calling message.ack() had no effect. The strange thing is that noAck=True should have caused the ACK to be sent at reception, even for start message, but it was not.
    2. I got mistaken with the closure of the on_done callback. I thought the closure contained the message at the moment it was created, but it wasn't. Instead, it contained the last message received, i.e. the "stop" message. I'm quite new to Python so I am not sure how closures work. I assumed it worked like JavaScript but it seems not. Overall I stored the message when creating the task, gave it back as an argument to the on_done callback and it worked.

    I could not find the solution until I used the debugger to inspect the execution step by step. This shows once again the superiority of the debugger when troubleshooting. I should have gone with it earlier.