Search code examples
pythonrabbitmqamqppika

Stop channel.basic_consume if the connection is idle/Not consuming from long time


I am having a use case in which i want get the last idle time(last message processed time) of a pika consumer(pika.BlockingConnection).
Usecase:
If the last processed time is greater than Threshold time(ex: 1 hr). I want the consumer to get exited or have a callback method to decide on what i need to do? Like sending a notification to a user.

Is there any way to do this?


Solution

  • pika supports a timeout callback.

    You could add this callback at the end of each message receipt, keeping the reference to it, and removing it at the start of each message receipt.

    def close_connec():
        # close here
    
    timer_id = None
    
    def on_message(chan, method, props, body):
        if timer_id is not None:
            chan.connection.remove_timeout(timer_id)
        # process message
        timer_id = chan.connection.add_timeout(3600, close_connec)