Search code examples
pythonrabbitmqpika

pika with celery, connection closed


I'm using celery with RabbitMQ to run some tasks, sometimes I need to return a message from the workers back to the RabbitMQ so i'm using pika.

I'm currently using the BlockingConnection() in order to connect to the RabbitMQ but after a while I get an exception "Connection Lost".

I believe that it happens because celery is asynchronous and I'm using the BlockingConnection().

This is my code:

class RabbitConnection(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=RABBITMQ_OUT_NAME, durable=True)
        self.channel.confirm_delivery()

    def add_alert(self, new_alert):
        message = new_alert.to_json()
        delivered = self.channel.basic_publish(exchange='',
                                               routing_key=RABBITMQ_OUT_NAME,
                                               body=message,
                                               properties=pika.BasicProperties(
                                                   delivery_mode=2,
                                                   content_type='application/json',
                                               ))

Should I use a different connection? if so how should I use it?


Solution

  • It sounds like this could be a threading issue. You can handle requests with Pika over multiple threads, but ideally you should have one connection per thread, or use locking. Instead of adding additional complexity to your code I would recommend that you use a thread-safe library; such as amqp-storm or rabbitpy.

    If you would implement this using my AMQP-Storm library, the code would look something like this.

    import amqpstorm
    
    class RabbitConnection(object):
        def __init__(self):
            self.connection = amqpstorm.Connection('localhost', 'guest', 'guest')
            self.channel = self.connection.channel()
            self.channel.queue.declare(queue=RABBITMQ_OUT_NAME, durable=True)
            self.channel.confirm_deliveries()
    
        def add_alert(self, new_alert):
            message = new_alert.to_json()
            delivered = self.channel.basic.publish(exchange='',
                                                   routing_key=RABBITMQ_OUT_NAME,
                                                   body=message,
                                                   properties={
                                                       'delivery_mode': 2,
                                                       'content_type': 'application/json',
                                                    })