I'm using kombu to manage RabbitMQ, via a producer/consumer model. I launched my producer, which placed 100 jobs on a queue (I have only one queue, and one exchange). I would like to launch multiple consumers, simultaneously, and have each consumer process one job at a time. Unfortunatly the consumers are blocking each other (i.e. as one consumer grabs a job from the queue, the other consumers are just sitting idle). If I kill the working consumer, then one of the other consumers kicks in and starts working. Is there a way to have all the consumers running simultaneously, each processing a different job from the queue? My consumer code is below:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
You need to set the consumer prefetch to 1 (https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos), that way each consumer will just grab 1 message, and leave the rest in the queue with the state ready, so if you have 2 consumers with QOS set to 1 and you have 100 messages you will be processing 2 simultaneous tasks.
I've added the missing parts to your code, to set the prefetch count
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
channel = self.rabbitmq_connection.channel()
channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message