Celery equivalent of a JoinableQueue

What would be Celery's equivalent of a multiprocessing.JoinableQueue (or gevent.queue.JoinableQueue)?

The functionality I'm looking for is the ability to .join() a Celery task queue from a publisher, waiting for the all tasks in the queue to be done.

Waiting for an initial AsyncResult or GroupResult isn't going to be sufficient, as the queue dynamically fills up by the workers themselves.


  • It might not be perfect, but this is what I came up with eventually.

    It's basically a JoinableQueue wrapper on top of an existing Celery queue, based on a shared Redis counter and a list listener. It requires the queue name to be the same as it's routing key (due to internal implementation details of the before_task_publish and task_postrun signals).

    from celery.signals import before_task_publish, task_postrun
    from redis import Redis
    import settings
    memdb = Redis.from_url(settings.REDIS_URL)
    class JoinableCeleryQueue(object):
        def __init__(self, queue):
            self.queue = queue
        def begin(self):
            memdb.set(self.count_prop, 0)
        def count_prop(self):
            return "jqueue:%s:count" % self.queue
        def finished_prop(self):
            return "jqueue:%s:finished" % self.queue
        def task_add(self, routing_key, **kw):
            if routing_key != self.queue:
        def task_done(self, task, **kw):
            if task.queue != self.queue:
            if memdb.get(self.count_prop) == "0":
                memdb.rpush(self.finished_prop, 1)
        def register_queue_hooks(self):
        def join(self):

    I've chosen to use BRPOP instead of a pub/sub as I only need one listener listening to the "all task finished" event (the publisher).

    Using a JoinableCeleryQueue is pretty simple - begin() before adding any tasks to the queue, add tasks using regular Celery API, .join() to wait for all the tasks to be done.