Search code examples
pythoncelery

Add tasks to current_task group in celery


I have a chain of tasks from which the result of the first few tasks will determine the size of a following group so I can't know ahead of time what the size of the group should be.


@celery.task
def rep():
    print("hello world!")

@celery.task
def add(x, y):
    return x + y

@celery.task
def mul(x, y):
    return x * y

def start_tasks(x, y, z):
    tasks = chain(
        mul.s(x, y),
        add.s(z),
        group(should_have_x_times_y_plus_z_rep_tasks)
    )
    result = tasks.apply_async()
    result.save()
    return results.id

One way I can solve this is to have an intermediate task/callback that will receive the result of the add function and start as many tasks as required. This will require that I then check to see if the tasks spawned from the intermediate function did finish.

I was wondering if there is a way to add those tasks to the current_task from the intermediate task somehow so that the main tasks only finishes if all tasks finish.

Something like this


@celery.task
def rep():
    print("hello world!")

@celery.task
def add(x, y):
    return x + y

@celery.task
def mul(x, y):
    return x * y

@celery.task
def intermediate(l):
    current_task.children.append([rep.s().apply_async() for _ in range(l)])

def start_tasks(x, y, z):
    tasks = chain(
        mul.s(x, y),
        add.s(z),
        intermediate().s()
    )
    result = tasks.apply_async()
    result.save()
    return results.id

Solution

  • ok, so I did find a solution that works. I don't know if its the best one or the proper way but it seems to work. The current_task has a replace function that can replace the current task with another using the same task id. So the solution would look like this

    @celery.task
    def rep():
        print("hello world!")
    
    @celery.task
    def add(x, y):
        return x + y
    
    @celery.task
    def mul(x, y):
        return x * y
    
    @celery.task
    def intermediate(l):
        task_group = group([rep.s() for _ in range(l)])
        current_task.replace(task_group)
    
    def start_tasks(x, y, z):
        tasks = chain(
            mul.s(x, y),
            add.s(z),
            intermediate().s()
        )
        result = tasks.apply_async()
        result.save()
        return results.id