Search code examples
pythonasynchronousceleryschedulinggroup

Celery Groups: How to create a group of already running tasks?


I have a set of long running tasks I need to coordinate in group. The challenge is the tasks must be fired as soon as they can be created, not after all tasks are defined (as shown below).

The problem is group(...) does not accept AsyncTask and calling apply_async on the task results in every task running twice.

import time
from celery import group, chain, chord, subtask


# Run it
pipeline = rate_limited_task.s(end_task.s())
job_id_to_track = pipeline.apply_async()


# TASKS
@app.task(bind=true)
def rate_limited_task(self, downstream_task):
 
    tasks = []

    for random_number in np.random.randint(1, 10, 1000):
        # ... imagine it takes several seconds to generate a random number. This will loop many times
        
        task = parallel_tasks.si(random_number) # This needs to fire immediately 
        tasks.append(task)


    pipeline = group(tasks) | downstream_task

    return pipeline()

@app.task(bind=true)
def parallel_tasks(self, data):
    # Another long running task
    print(f'sleeping for {data} seconds')
    time.sleep(data)

    return data

@app.task(bind=true)
def end_task(self, results):

        print('End task')
        print(results)

Question: Is it possible to create a group of tasks that are already running (or in any state)?


Current solution (not ideal)

from celery.result import allow_join_result, GroupResult

@app.task(bind=true)
def rate_limited_task(self, downstream_task):
 
    tasks = []

    for random_number in np.random.randint(1, 10, 1000):
        # ... imagine it takes several seconds to generate a random number. This will loop many times
        
        task = parelel_tasks.apply_async([random_number])
        tasks.append(task)


    gid = uuid()

    result_set = GroupResult(id=gid, results=tasks)
    result_set.save(backend=self.backend)

    chain(
        group_tasks.s(gid),
        downstream_task
    )()
...

@app.task(bind=true)
def group_tasks(self, group_task_id):

    group_task = GroupResult.restore(group_task_id, app=app, backend=self.backend)

    # Less than ideal solution.
    # Needs logic to handle any failed tasks (solved in the group class)
    # Tasks should have an expiration time so that they are not retried forever

    if not group_task.ready():
        raise self.retry(countdown=1)

    with allow_join_result():
        results = group_task.join()
        return results

Solution

  • You cannot create a group of already existing tasks because Celery's groups are collections of lazy_evaluated signatures, not tasks. (The group actually executes the signatures, so the idea of "adding" tasks is a non-starter).

    If you really want to do this, you can reach for celery's vine.synchronization.barrier implementation which supports adding tasks and waiting on everything being done before moving on. That being said, it is as you mentioned where you would have to implement almost all of apply_async and __apply_async from celery.group class to make it viable.

    So, tl;dr, probably not worth it because...


    You can pass in an generator into a group and it will start as soon as values are passed from the generator to the group.

    Here is the PoC that shows that this works:

    from celery import Celery, group
    import time, os, random
    app = Celery(
        'tasks', 
        broker=os.environ.get("BROKER"),
        backend=os.environ.get('BACKEND'),
        enable_utc=True,
    )
    
    @app.task(bind=True)
    def end_task(self, results):
        print(f'{self.request.id}: end task')
        print(results)
        return results
    
    @app.task(bind=True)
    def parallel_task(self,x):
        time.sleep(60/x)
        print(f'sleeping for {60/x}s')
        return x + 5
    
    def slow_random(high):
        time.sleep(15)
        value = random.randint(1,high)
        return value
    
    @app.task(bind=True)
    def rate_limited_task(self, downstream_task):
        g = group(parallel_task.si(slow_random(10)) for _ in range(10))
        pipeline = g | downstream_task
        return pipeline.delay().id
    
    pipeline = rate_limited_task.s(end_task.s())
    job_id_to_track = pipeline.delay().get()
    print(job_id_to_track)