I am trying to create a Python script that waits for a message on a RabbitMQ queue to start a task in a subprocess. During task execution, the script continues to consume another queue for a cancel order that would stop the task.
I use kombu
package to handle interactions with RabbitMQ. I call message.ack()
when the task terminates (whether normally or because of cancellation).
Despite the call to message.ack()
, the start message is not removed from the queue (I can tell by using rabbitmqctl
). This causes the message to be redelivered even if the task ran to completion.
I created a sample repository to show the problem. The README file shows the reproduction steps.
I do not know where the problem could be. I realize there are many moving parts but this is a slightly simplified version of a real project with its own constraints (like the Python version being fixed to 3.8). I am open to any suggestion to make a better use of kombu
or asyncio
because I am quite new to AMQP and async Python.
I finally found the cause of my problem. Actually there were two causes:
noAck=True
for the "stop" consumer, calling message.ack()
had no effect. The strange thing is that noAck=True
should have caused the ACK to be sent at reception, even for start message, but it was not.on_done
callback. I thought the closure contained the message
at the moment it was created, but it wasn't. Instead, it contained the last message received, i.e. the "stop" message. I'm quite new to Python so I am not sure how closures work. I assumed it worked like JavaScript but it seems not. Overall I stored the message when creating the task, gave it back as an argument to the on_done
callback and it worked.I could not find the solution until I used the debugger to inspect the execution step by step. This shows once again the superiority of the debugger when troubleshooting. I should have gone with it earlier.