I have a chain of tasks from which the result of the first few tasks will determine the size of a following group so I can't know ahead of time what the size of the group should be.
@celery.task
def rep():
print("hello world!")
@celery.task
def add(x, y):
return x + y
@celery.task
def mul(x, y):
return x * y
def start_tasks(x, y, z):
tasks = chain(
mul.s(x, y),
add.s(z),
group(should_have_x_times_y_plus_z_rep_tasks)
)
result = tasks.apply_async()
result.save()
return results.id
One way I can solve this is to have an intermediate
task/callback that will receive the result of the add
function and start as many tasks as required. This will require that I then check to see if the tasks spawned from the intermediate
function did finish.
I was wondering if there is a way to add those tasks to the current_task
from the intermediate
task somehow so that the main tasks only finishes if all tasks finish.
Something like this
@celery.task
def rep():
print("hello world!")
@celery.task
def add(x, y):
return x + y
@celery.task
def mul(x, y):
return x * y
@celery.task
def intermediate(l):
current_task.children.append([rep.s().apply_async() for _ in range(l)])
def start_tasks(x, y, z):
tasks = chain(
mul.s(x, y),
add.s(z),
intermediate().s()
)
result = tasks.apply_async()
result.save()
return results.id
ok, so I did find a solution that works. I don't know if its the best one or the proper way but it seems to work. The current_task
has a replace
function that can replace the current task with another using the same task id. So the solution would look like this
@celery.task
def rep():
print("hello world!")
@celery.task
def add(x, y):
return x + y
@celery.task
def mul(x, y):
return x * y
@celery.task
def intermediate(l):
task_group = group([rep.s() for _ in range(l)])
current_task.replace(task_group)
def start_tasks(x, y, z):
tasks = chain(
mul.s(x, y),
add.s(z),
intermediate().s()
)
result = tasks.apply_async()
result.save()
return results.id