Search code examples
pythonrabbitmqamqp

AMQP / RabbitMQ multiple consumers but only one geting work?


I'm learning AMQP, so this is probably a misunderstanding of what I'm doing. I'm trying to set up rabbitmq on my own private server to serve work out to a farm of systems (i have a bunch of images i need to process). I set up a three step pipeline:

| put image on queue | --work_queue--> | process | --results--> | archive results | 

I installed rabbitmq, and created two queues on the server. 'work_queue' and 'results'. I wrote some python scripts using amqplib, and I have the pipeline working just fine with one image processor worker. I've added 100 images to the queue and my one machine is happily grabbing one at a time and churning away on the data, and putting the results on the results queue.

The problem is, I had assumed that if i started another image process worked on another machine, it would simply pull work off the queue as well. This seems to be the exact case listed in the "work queue" tutorial on the rabbitmq site. I expected this to just work. However, what really happens is that no matter how many other workers I start, they simply wait forever and never get any work, even though there are plenty of messages waiting in work_queue.

What have I misunderstood? Relevant code that queues the work on the server:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()

....

msg = { 'filename': os.path.basename(i) }

chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
        routing_key='work_queue')

And the consumer side on the process worker:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()

def work_callback(msg):
.... 

while True:
    chan.basic_consume(callback=work_callback, queue='work_queue')
    try:
        chan.wait()
    except KeyboardInterrupt:
        print "\nExiting cleanly"
        sys.exit(0)

I can see the other workers are connected:

$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue      246
...done.

$ sudo rabbitmqctl list_connections 
Listing connections ...
pipeline        192.168.8.1     41553   running
pipeline        XX.YY.ZZ.WW     46676   running
pipeline        192.168.8.4     44482   running
pipeline        192.168.8.6     41884   running
...done.

where XX.YY.ZZ.WW is an external IP. The worker at 192.168.8.6 is churning away on the queue, yet the worker on the external IP sits there idle while 246 messages wait in the queue its supposed to be waiting on.

Thoughts?


Solution

  • Sorry for answering my own question but I finally got the behaviour I was looking for. I needed to set the QoS prefetch_count to 1. Apparently, by not setting a prefetch_count, as soon as one client connected, all 200+ messages were delivered to its "channel" and drained from the queue, so no others could see any work unless the original worked disconnected (thus closing the channel and putting the messages back on the queue.

    by adding:

    chan.basic_qos(0,1,False)
    

    to my workers, they now only grab one message at a time. Not exactly what I was expecting to happen, but its working as intended now none the less.