Search code examples
pythonqueuetaskceleryworker

Celery limit number of specific task in queue


I'm using Celery 3.1.x with 2 tasks. The first task (TaskOne) is enqueued when Celery starts up through the celeryd_after_setup signal:

@celeryd_after_setup.connect
def celeryd_after_setup(*args, **kwargs):
    TaskOne().apply_async(countdown=5)

When TaskOne is run, it does some calculations and then enqueues TaskTwo. Imagine the following workflow:

  • I start celery, thus the signal is fired and TaskOne is enqueued
  • after the countdown (5) TaskTwo is enqueued
  • then I stop celery (the TaskTwo remains in the queue)
  • afterwards I restart celery
  • the workflow is run again and TaskTwo is enqueued again

So we have 2 TaskTwo in the queue. That is a problem for my workflow because I only want one TaskTwo within the queue and avoid that a second one is enqueued.

My question: How can I achieve this?

With celery.app.control.Inspect.scheduled() (Docs) I can get a list of which tasks are scheduled, hidden in a combination of lists and dicts. This is maybe a way, but going through the result of this does not feel right. Is there any better way?


Solution

  • After considering several options I chose to use app.control.inspect. It's not a really beautiful solution, but it works:

    # fetch all scheduled tasks
    scheduled_tasks = inspect().scheduled()
    
    # iterate the scheduled task values, see http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#dump-of-scheduled-eta-tasks
    for task_values in iter(scheduled_tasks.values()):
        # task_values is a list of dicts
        for task in task_values:
            if task['request']['name'] == '{}.{}'.format(TaskTwo.__module__, TaskTwo.__name__):
                logger.info('TaskTwo is already scheduled, skipping additional run')
                    return