Search code examples
pythonrabbitmqpika

Python and RabbitMQ - Best way to listen to consume events from multiple channels?


I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.

For example, I can consume events on one with the following:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?


Solution

  • The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.

    In each example I'm going to assume exchange is already declared.

    Threads

    You can consume messages from two queues on separate hosts in single process using pika.

    You are right - as its own FAQ states, pika is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading module looks as follows:

    import pika
    import threading
    
    
    class ConsumerThread(threading.Thread):
        def __init__(self, host, *args, **kwargs):
            super(ConsumerThread, self).__init__(*args, **kwargs)
    
            self._host = host
    
        # Not necessarily a method.
        def callback_func(self, channel, method, properties, body):
            print("{} received '{}'".format(self.name, body))
    
        def run(self):
            credentials = pika.PlainCredentials("guest", "guest")
    
            connection = pika.BlockingConnection(
                pika.ConnectionParameters(host=self._host,
                                          credentials=credentials))
    
            channel = connection.channel()
    
            result = channel.queue_declare(exclusive=True)
    
            channel.queue_bind(result.method.queue,
                               exchange="my-exchange",
                               routing_key="*.*.*.*.*")
    
            channel.basic_consume(self.callback_func,
                                  result.method.queue,
                                  no_ack=True)
    
            channel.start_consuming()
    
    
    if __name__ == "__main__":
        threads = [ConsumerThread("host1"), ConsumerThread("host2")]
        for thread in threads:
            thread.start()
    

    I've declared callback_func as a method purely to use ConsumerThread.name while printing message body. It might as well be a function outside the ConsumerThread class.

    Processes

    Alternatively, you can always just run one process with consumer code per queue you want to consume events.

    import pika
    import sys
    
    
    def callback_func(channel, method, properties, body):
        print(body)
    
    
    if __name__ == "__main__":
        credentials = pika.PlainCredentials("guest", "guest")
    
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=sys.argv[1],
                                      credentials=credentials))
    
        channel = connection.channel()
    
        result = channel.queue_declare(exclusive=True)
    
        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")
    
        channel.basic_consume(callback_func, result.method.queue, no_ack=True)
    
        channel.start_consuming()
    

    and then run by:

    $ python single_consume.py host1
    $ python single_consume.py host2  # e.g. on another console
    

    If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.

    Async

    Another alternative is to involve some asynchronous framework (for example Twisted) and running whole thing in single thread.

    You can no longer use BlockingConnection in asynchronous code; fortunately, pika has adapter for Twisted:

    from pika.adapters.twisted_connection import TwistedProtocolConnection
    from pika.connection import ConnectionParameters
    from twisted.internet import protocol, reactor, task
    from twisted.python import log
    
    
    class Consumer(object):
        def on_connected(self, connection):
            d = connection.channel()
            d.addCallback(self.got_channel)
            d.addCallback(self.queue_declared)
            d.addCallback(self.queue_bound)
            d.addCallback(self.handle_deliveries)
            d.addErrback(log.err)
    
        def got_channel(self, channel):
            self.channel = channel
    
            return self.channel.queue_declare(exclusive=True)
    
        def queue_declared(self, queue):
            self._queue_name = queue.method.queue
    
            self.channel.queue_bind(queue=self._queue_name,
                                    exchange="my-exchange",
                                    routing_key="*.*.*.*.*")
    
        def queue_bound(self, ignored):
            return self.channel.basic_consume(queue=self._queue_name)
    
        def handle_deliveries(self, queue_and_consumer_tag):
            queue, consumer_tag = queue_and_consumer_tag
            self.looping_call = task.LoopingCall(self.consume_from_queue, queue)
    
            return self.looping_call.start(0)
    
        def consume_from_queue(self, queue):
            d = queue.get()
    
            return d.addCallback(lambda result: self.handle_payload(*result))
    
        def handle_payload(self, channel, method, properties, body):
            print(body)
    
    
    if __name__ == "__main__":
        consumer1 = Consumer()
        consumer2 = Consumer()
    
        parameters = ConnectionParameters()
        cc = protocol.ClientCreator(reactor,
                                    TwistedProtocolConnection,
                                    parameters)
        d1 = cc.connectTCP("host1", 5672)
        d1.addCallback(lambda protocol: protocol.ready)
        d1.addCallback(consumer1.on_connected)
        d1.addErrback(log.err)
    
        d2 = cc.connectTCP("host2", 5672)
        d2.addCallback(lambda protocol: protocol.ready)
        d2.addCallback(consumer2.on_connected)
        d2.addErrback(log.err)
    
        reactor.run()
    

    This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.

    Python 3

    Since you've mentioned pika, I've restricted myself to Python 2.x-based solutions, because pika is not yet ported.

    But in case you would want to move to >=3.3, one possible option is to use asyncio with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp or aioamqp.

    * - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations