What would be Celery's equivalent of a multiprocessing.JoinableQueue
(or gevent.queue.JoinableQueue
)?
The functionality I'm looking for is the ability to .join()
a Celery task queue from a publisher, waiting for the all tasks in the queue to be done.
Waiting for an initial AsyncResult
or GroupResult
isn't going to be sufficient, as the queue dynamically fills up by the workers themselves.
It might not be perfect, but this is what I came up with eventually.
It's basically a JoinableQueue
wrapper on top of an existing Celery queue, based on a shared Redis counter and a list listener. It requires the queue name to be the same as it's routing key (due to internal implementation details of the before_task_publish
and task_postrun
signals).
joinableceleryqueue.py:
from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings
memdb = Redis.from_url(settings.REDIS_URL)
class JoinableCeleryQueue(object):
def __init__(self, queue):
self.queue = queue
self.register_queue_hooks()
def begin(self):
memdb.set(self.count_prop, 0)
@property
def count_prop(self):
return "jqueue:%s:count" % self.queue
@property
def finished_prop(self):
return "jqueue:%s:finished" % self.queue
def task_add(self, routing_key, **kw):
if routing_key != self.queue:
return
memdb.incr(self.count_prop)
def task_done(self, task, **kw):
if task.queue != self.queue:
return
memdb.decr(self.count_prop)
if memdb.get(self.count_prop) == "0":
memdb.rpush(self.finished_prop, 1)
def register_queue_hooks(self):
before_task_publish.connect(self.task_add)
task_postrun.connect(self.task_done)
def join(self):
memdb.brpop(self.finished_prop)
I've chosen to use BRPOP
instead of a pub/sub as I only need one listener listening to the "all task finished" event (the publisher).
Using a JoinableCeleryQueue
is pretty simple - begin()
before adding any tasks to the queue, add tasks using regular Celery API, .join()
to wait for all the tasks to be done.