say I have made a connection to RabbitMQ like following:
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue=getting_from_this_queue)
channel.basic_consume(
callback, queue=getting_from_this_queue, no_ack=False)
channel.basic_qos( prefetch_count = 3 )
to achieve better concurrency, I tried to place every job in an internal Queue, and created a while loop to asynchronously dispatch worker for every job retrieved from this internal Queue:
from Queue import Queue
from multiprocessing.dummy import Pool as ThreadPool
task_queue = Queue(10)
pool = Pool(20)
def worker(ch, method, job):
# ...some heavy lifting...
if job_gets_done: # some abstraction
print "job success"
ch.basic_ack(delivery_tag=method.delivery_tag) # PROBLEM : this seems not working
else:
print "job failed"
def callback(ch, method, properties, job):
task_queue.put((ch,method,dn)) # put job in internal queue, block if full.
@threaded
def async_process_jobs(): # loop to get job and start thread worker.
while True:
params = task_queue.get()
pool.apply_async( worker, params ) # param = (ch,method, job)
async_process_jobs()
channel.start_consuming()
problem is, when the jobs are being processed, non-of them are sending acknowledgement properly (even if the execution flow really goes through it, i.e. prints "job success"). the queue size on rabbitmq stays the same, Why?
In a somewhat official tutorial, the basic_ack() was placed inside callback(), but mine didn't. could this be the source of problem?
Detailed Behaviour (may not be important): assume i have 10000 jobs in the queue, In the beginning, some 2000 message went into Unacked state, then all of them goes back to Ready state, even if my workers are still processing and printing "job succes"(acking).
From the FAQ of pika:
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.