Using RabbitMQ and pika (python), I am running a job queuing system that feeds nodes (asynchronous consumers) with tasks. Each message that defines a task is only acknowloedged once that task is completed.
Sometimes I need to perform updates on these nodes and I have created an exit mode, in which the node waits for its tasks to finish, then exits gracefully. I can then perform my maintenance work.
So that a node does not get more messages from RabbitMQ while in this exit mode, I let it call the basic_cancel method before waiting for the jobs to finish.
This effect of this method is described in the pika documentation :
This method cancels a consumer. This does not affect already
delivered messages, but it does mean the server will not send any more
messages for that consumer. The client may receive an arbitrary number
of messages in between sending the cancel method and receiving the
cancel-ok reply. It may also be sent from the server to the client in
the event of the consumer being unexpectedly cancelled (i.e. cancelled
for any reason other than the server receiving the corresponding
basic.cancel from the client). This allows clients to be notified of
the loss of consumers due to events such as queue deletion.
So if you read "already delivered messages" as messages already received, but not necessarily acknowledged, the tasks the exit mode allows to wait for should not be requeued even if the the consumer node that runs it cancels itself out of the queuing system.
My code for the stop function of my async consumer class (taken from the pika example) is similar to this one :
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok
will be invoked by pika, which will then closing the channel and
connection. The IOLoop is started again because this method is invoked
when CTRL-C is pressed raising a KeyboardInterrupt exception. This
exception stops the IOLoop which needs to be running for pika to
communicate with RabbitMQ. All of the commands issued prior to starting
the IOLoop will be buffered but not processed.
"""
LOGGER.info('Stopping')
self._closing = True
self.stop_consuming()
LOGGER.info('Waiting for all running jobs to complete')
for index, thread in enumerate(self.threads):
if thread.is_alive():
thread.join()
# also tried with a while loop that waits 10s as long as the
# thread is still alive
LOGGER.info('Thread {} has finished'.format(index))
# also tried moving the call to stop consuming up to this point
if self._connection!=None:
self._connection.ioloop.start()
LOGGER.info('Closing connection')
self.close_connection()
My issue is that after the consumer cancellation, the async consumer appears to not be sending heartbeats anymore, even if I perform the cancellation after the loop where I wait for my tasks (threads) to finish.
I have read about a process_data_events function for BlockingConnections but I could not find such function. Is the ioloop of the SelectConnection class the equivalent for async consumer ?
As my node in exit mode does not send heartbeats anymore, the tasks it is currently performing will be requeued by RabbitMQ once the maximum heartbeat is reached. I would like to keep this heartbeat untouched, as it is anyway not an issue when I am not in exit mode (my heartbeat here is about 100s, and my tasks might take as much as 2 hours to complete).
Looking at the RabbitMQ logs, the heartbeat is indeed the reason :
=ERROR REPORT==== 12-Apr-2017::19:24:23 ===
closing AMQP connection (.....) :
missed heartbeats from client, timeout: 100s
The only workaround I can think of is acknowledging the messages corresponding to the tasks still running when in exit mode, and hoping that these tasks will not fail...
Is there any method from the channel or connection that I can use to send some heartbeats manually while waiting ?
Could the issue be that the time.sleep() or thread.join() method (from the python threading package) act as completely blocking and do not allow some other threads to perform what they need ? I use in other applications and they don't seem to act as such.
As this issue only appears when in exit mode, I guess there is something in the stop function that causes the consumer to stop sending heartbeats, but as I have also tried (without any success) to call the stop_consuming method only after the wait-on-running-tasks loop, I don't see what can be the root of this issue.
Thanks a lot for your help !
turns out the stop_consuming function was calling basic_cancel in an asynchronous manner with a callback on the channel.close() function, resulting in my application to stop its RabbitMQ interaction and RabbitMQ requeuing the unackesdmessages. Actually realized that as the threads trying to later acknowledge the remaining tasks were having an error as the channel was now set to None, and thus did not have a ack method anymore.
Hope it helps someone!