Search code examples
pythonrabbitmqpika

Close connection in readable queue python


When the microservice takes a message from RabbitMQ and the data is processed for a long time, and the connection is closed with the queue that is on the wiretap

Traceback (most recent call last):
  File "/home/saturn/Logic/MAIN_1.py", line 200, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 1780, in start_consuming
    self.connection.process_data_events(time_limit=None)
  File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 707, in process_data_events
    self._flush_output(common_terminator)
  File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 474, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')") 

Now task processing near 5 minutes. Main code like -

credentials = pika.PlainCredentials(username='NAME',password='PASSWORD')
ConnParr = pika.ConnectionParameters(host='HOST', credentials=credentials)
connection = pika.BlockingConnection(ConnParr)
channel = connection.channel()

def callback(ch, method, properties, body):
    in_data = json.loads(body.decode('utf-8'))
    main(in_data)
    ch.basic_ack(delivery_tag=method.delivery_tag)

def main(in_data):
    time.sleep(300)

channel.queue_declare(queue=IN_QUEUE)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=IN_QUEUE)
channel.start_consuming()     

Solution

  • This is happening because the time.sleep call blocks your main thread and prevents Pika from sending and receiving heartbeat messages from RabbitMQ. You have a couple ways to fix this:

    • Upgrade to Pika 0.12.0, run your main method in a separate thread, and use add_callback_threadsafe in that thread to call basic_ack on the channel docs.

    • Use the asynchronous consumer example as a starting point for your code.

    The important part to remember is that you can't block Pika's internal event loops and expect the connection to stay alive.