Search code examples
pythonmultithreadingrabbitmqpikakombu

Python Kombu - blocking


I'm using kombu to manage RabbitMQ, via a producer/consumer model. I launched my producer, which placed 100 jobs on a queue (I have only one queue, and one exchange). I would like to launch multiple consumers, simultaneously, and have each consumer process one job at a time. Unfortunatly the consumers are blocking each other (i.e. as one consumer grabs a job from the queue, the other consumers are just sitting idle). If I kill the working consumer, then one of the other consumers kicks in and starts working. Is there a way to have all the consumers running simultaneously, each processing a different job from the queue? My consumer code is below:

def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]

        callbacks.append(self._callback)
        queues.append(self.incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()

        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message 

Solution

  • You need to set the consumer prefetch to 1 (https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos), that way each consumer will just grab 1 message, and leave the rest in the queue with the state ready, so if you have 2 consumers with QOS set to 1 and you have 100 messages you will be processing 2 simultaneous tasks.

    I've added the missing parts to your code, to set the prefetch count

    def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]
    
        callbacks.append(self._callback)
        queues.append(self.incoming_queue)
    
        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()
    
        channel = self.rabbitmq_connection.channel()
        channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
    
        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message