Search code examples
pythonqueuecelerypython-multiprocessinggevent

Celery equivalent of a JoinableQueue


What would be Celery's equivalent of a multiprocessing.JoinableQueue (or gevent.queue.JoinableQueue)?

The functionality I'm looking for is the ability to .join() a Celery task queue from a publisher, waiting for the all tasks in the queue to be done.

Waiting for an initial AsyncResult or GroupResult isn't going to be sufficient, as the queue dynamically fills up by the workers themselves.


Solution

  • It might not be perfect, but this is what I came up with eventually.

    It's basically a JoinableQueue wrapper on top of an existing Celery queue, based on a shared Redis counter and a list listener. It requires the queue name to be the same as it's routing key (due to internal implementation details of the before_task_publish and task_postrun signals).

    joinableceleryqueue.py:

    from celery.signals import before_task_publish, task_postrun
    from redis import Redis
    import settings
    
    memdb = Redis.from_url(settings.REDIS_URL)
    
    class JoinableCeleryQueue(object):
        def __init__(self, queue):
            self.queue = queue
            self.register_queue_hooks()
    
        def begin(self):
            memdb.set(self.count_prop, 0)
    
        @property
        def count_prop(self):
            return "jqueue:%s:count" % self.queue
    
        @property
        def finished_prop(self):
            return "jqueue:%s:finished" % self.queue
    
        def task_add(self, routing_key, **kw):
            if routing_key != self.queue:
                return
    
            memdb.incr(self.count_prop)
    
        def task_done(self, task, **kw):
            if task.queue != self.queue:
                return
    
            memdb.decr(self.count_prop)
            if memdb.get(self.count_prop) == "0":
                memdb.rpush(self.finished_prop, 1)
    
        def register_queue_hooks(self):
            before_task_publish.connect(self.task_add)
            task_postrun.connect(self.task_done)
    
        def join(self):
            memdb.brpop(self.finished_prop)
    

    I've chosen to use BRPOP instead of a pub/sub as I only need one listener listening to the "all task finished" event (the publisher).

    Using a JoinableCeleryQueue is pretty simple - begin() before adding any tasks to the queue, add tasks using regular Celery API, .join() to wait for all the tasks to be done.