I have a RabbitMQ task queue and a Pika consumer to consume these tasks (with acks). The problem is that the connection dies after 90 seconds Idle but my tasks will often take longer than that. That means that while tasks are still being computed they are returned to the task queue and never acked.
Using RabbitMQ 3.5.3 and Pika 0.9.14 with the channel.basic_consume() method. The connection has a heartbeat_interval of 30 seconds.
Consume code:
import pika
from time import sleep
RABBITMQ_URL = "amqp://user:pass@my-host.com/my_virtual_host?heartbeat_interval=30"
QUEUE_NAME = "my_queue"
def callback(ch, method, properties, body):
print body
sleep(91) # if sleep value < 90 this code works (even 89)
ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=QUEUE_NAME)
channel.start_consuming()
Traceback:
Traceback (most recent call last):
File "main.py", line 19, in <module>
channel.basic_consume(callback, queue=QUEUE_NAME)
File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 221, in basic_consume
{'consumer_tag': consumer_tag})])
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1143, in _rpc
self.connection.process_data_events()
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 240, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 347, in _handle_read
if self._read_poller.ready():
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 43, in inner
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 89, in ready
self.poll_timeout)
select.error: (9, 'Bad file descriptor')
The problem here is that because you are sleeping for so long, pika cannot respond to heartbeat requests from RabbitMQ, and when this happens, RabbitMQ will close the connection.
The only way around this is to either disable heartbeats or sleep in smaller intervals and run process_data_events()
continuously so that pika can handle the heartbeats.
e.g. something like this
def amqp_sleep(connection, time_to_sleep=20):
remaining = time_to_sleep
while remaining > 0:
connection.process_data_events()
time.sleep(5)
remaining -= 5
Personally though I would go for a library that automatically handles the heartbeats in the background so you don't have to deal with them, e.g. rabbitpy or my own amqp-storm.