I am trying to run Celery as task management and having trouble while running multiple tasks in a group. After all the tasks in the group are complete I want to collect the results. The workflow works fine if there is only 1 task in the group, it waits for all the tasks to be complete. However, it fails if there are 2 or more tasks in the group or I am not running it correctly. Below is the code sample
@celery2.task(name='square')
def square(a):
log.info(f'In square group {a}')
return a**a
@celery2.task(name='add_one')
def add_one(a):
b = a+1
return b
@celery2.task(name='add_one_and_square')
def add_one_and_square(a):
return (add_one.s(a) | square.s())
@celery2.task(name='collect')
def collect(a):
return a
@celery2.task(name='group-task')
def group_square(num):
return group([(add_one_and_square(i)) for i in range(num)])
Running celery workflow:
res = (add.s(2,3) | group_square.s()|collect.s())
res.apply_async()
Below is the captured data from the output, I see that the signatures are created not sure if this is the right approach or how to run chain of tasks in single group so it has behaviour similar of single task in group.
{'task': 'celery.group',
'args': [],
'kwargs': {'tasks': [{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [0],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [1],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [2],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [3],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None},
{'task': 'celery.chain',
'args': [],
'kwargs': {'tasks': [{'task': 'add_one',
'args': [4],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None},
{'task': 'square',
'args': [],
'kwargs': {},
'options': {},
'subtask_type': None,
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'chain',
'immutable': False,
'chord_size': None}]},
'options': {},
'subtask_type': 'group',
'immutable': False,
'chord_size': None}
I'd appreciate any insight. Thanks!
I think that running celery task(s) within another celery task is a bad practice and may lead to deadlock in some cases (I think it is also somewhere in the documentation). If you want to do it - it might be safer to run it aync.
In your scenario, I suggest to add the collect call within the group_square
task. Something like:
@celery2.task(name='group-task')
def group_square(num):
canvas_flow = group([(add_one_and_square.si(i)) for i in range(num)]) | collect.s()
return canvas_flow.apply_async()
now the result of the group_square will be ResultAsync
of something like that. You can check whenever it .ready()
and then get the .result()
.