Search code examples
pythoncelerychord

Python Celery nested chord


I'm trying to use nested chord's in Celery, but can't get it to work.

The use-case I have is a for first run a single task, the output of the then feeds into a group of multiple tasks, the output of that group is then intended to feed into another single task.

To debug I started with a minimal application inspired by this old issue: https://github.com/celery/celery/issues/4161

My test code is

#!/usr/bin/env python
from celery import Celery

app = Celery('canvastest', backend='redis://', broker='redis://')

@app.task
def i(x):
    return x

to run it I do:

  • celery -A canvastest shell
  • celery -A canvastest worker
  • docker run -p 6379:6379 redis

Inside the interactive shell I can the reproduce the example from the issue linked above as follows

Python 3.10.6 (main, Nov 14 2022, 16:10:14) [GCC 11.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> chord([i.s(1), i.s(2)])(group(i.s(), i.s())).get(timeout=5)
[[1, 2], [1, 2]]
>>> 

transforming that to match the first half of what I'm trying to do also works

>>> chord(i.s(1))(group(i.s(), i.s())).get(timeout=5)
[[1], [1]]
>>> 

now trying to expand that so the output is sent into a single task at the end is where it falls appart

>>> chord(chord(i.s(1))(group(i.s(), i.s())))(i.s()).get(timeout=5)
Traceback (most recent call last):
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/bin/shell.py", line 71, in _invoke_default_shell
    import IPython  # noqa
ModuleNotFoundError: No module named 'IPython'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/bin/shell.py", line 74, in _invoke_default_shell
    import bpython  # noqa
ModuleNotFoundError: No module named 'bpython'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1377, in __call__
    return self.apply_async((), {'body': body} if body else {}, **options)
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1442, in apply_async
    return self.run(tasks, body, args, task_id=task_id, **merged_options)
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1506, in run
    header_result_args = header._freeze_group_tasks(group_id=group_id, chord=body, root_id=root_id)
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1257, in _freeze_group_tasks
    results = list(self._freeze_unroll(
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 1299, in _freeze_unroll
    yield task.freeze(group_id=group_id,
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 304, in freeze
    return self.AsyncResult(tid)
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/kombu/utils/objects.py", line 30, in __get__
    return super().__get__(instance, owner)
  File "/usr/lib/python3.10/functools.py", line 981, in __get__
    val = self.func(instance)
  File "/home/jerkern/abbackend/etl/.venv/lib/python3.10/site-packages/celery/canvas.py", line 471, in AsyncResult
    return self.type.AsyncResult
AttributeError: 'AsyncResult' object has no attribute 'AsyncResult'

Am I just writing the code in the wrong way, or am I hitting some bug/limitation where it's not possible to feed the output from a chord into another chord?

Using celery 5.2.7 and redis 7.0.7


Solution

  • Four months later I found the answer to my problem. Calling it as chord(header)(callback) directly invokes it, if it is to be part of a chain the syntax changes to chord(header, callback)