Search code examples
pythonrabbitmqtornadopika

How to detect a queue has been deleted?


When I manually delete the queue that my PikaClient consumes, nothing happen. I can recreate the queue with the same name but the channel has stopped to consume the queue (normal because I have deleted it). But I would like to receive an event when the consumed queue has been deleted.

I expected that the channel would be automatically closed but «on_channel_close_callback» is never called. «basic_consume» does not provide any callback on close. An additional important point, I have to use the TornadoConnection.

Pika: 0.10.0 Python: 2.7 Tornado: 4.3

Thank you, for your help.

class PikaClient(object):

    def __init__(self):
        # init everything here

    def connect(self):
        pika.adapters.tornado_connection.TornadoConnection(connection_param, on_open_callback=self.on_connected)

    def on_connected(self, connection):
        self.logger.info('PikaClient: connected to RabbitMQ')
        self.connected = True
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_open_error_callback(self, *args):
        self.logger.error("on_open_error_callback")

    def on_channel_open(self, channel):
        channel.add_on_close_callback(self.on_channel_close_callback)

        channel.basic_consume(self.on_message, queue=self.queue_name, no_ack=True)

    def on_channel_close_callback(self, reply_code, reply_text):
        self.logger.error("Consumer was cancelled remotely, shutting down", reply_code=reply_code, reply_text=reply_text)

Solution

  • I have found a workaround. I checking every X sec if my PikaClient has consumed messages. If not I restart the application that will create automatically a queue.

    If you have a better solution, I am still open for suggestions.

    def __init__(self):
        ...
        self.have_messages_been_consumed = False
    
    def on_connected(self, connection):
        self.logger.info('PikaClient: connected to RabbitMQ')
        self.connected = True
        self.connection = connection
        self.connection.add_timeout(X, self.check_if_messages_have_been_consumed)
        self.connection.channel(self.on_channel_open)
    
    def check_if_messages_have_been_consumed(self):
        if self.have_messages_been_consumed:
            self.have_messages_been_consumed = False
            self.connection.add_timeout(X, self.check_if_messages_have_been_consumed)
        else:
            # close_and_restart will set to False have_messages_been_consumed
            self.close_and_restart()
    
    def on_message(self, channel, basic_deliver, header, body):
        self.have_messages_been_consumed = True
        ...