Search code examples
pythoncelerycelery-task

Collect results from parallel Celery task executions


I am trying to achieve a rather simple Celery workflow, where I receive the result of multiple parallel invocations of the same task as a tuple (or list).

@app.task
def add(x, y):
    return x + y

@app.task
def master():
    return group(add.s(1, 2), add.s(3, 4))()

From this, I would like to retrieve (3, 7) in a generic way, that is, in a way that does not depend on the workflow itself. I am looking for some sort of "reduce async result graph to primitives" operation. I have experimented with the following (I have replaced result IDs with #num for brevity)

r = master.delay()
r.get()      # <GroupResult: #1 [#2, #3]>
r.collect()  # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
             #  (<GroupResult: #1 [#2, #3]>, [3, 7])
             #  (<AsyncResult: #2>, 3),
             #  (<GroupResult: #3>, 7)]

r.get() returns a wrapper around two AsyncResult IDs, so I would have to process each one recursively. r.collect() is close, but it recurses too deeply.

I could do something like

r.children[0].get()

but this is not generic, since it explicitly depends on the structure of the result graph. Also, I could iterate through r.collect() until I find a tuple whose value is not an instance of ResultBase, like

next(value for _, value in r.collect() if not isinstance(value, ResultBase))

but I am not sure if this is actually correct in all cases, and I was hoping that there is a more elegant way to do this.

If there is a way of restructuring the master task to make retrieving the results easier, I am open to it, just as long as the subtasks are launched in parallel. Any suggestions would be appreciated. Thank you in advance.


EDIT A related issue is that if I want to retrieve the task results in a non-blocking way (for example, by manually polling r.status before calling r.get() or r.collect(), I cannot simply do this

r = master.delay()

# some time later...
if r.status in READY_STATES:
    r.get()

because r is an AsyncResult which resolves to a GroupResult, i.e. it completes before the GroupResult or its children. Is there a way to invoke the group in a way that "skips" the top-level AsyncResult? This would solve both issues, since r.status and r.get() would reflect the status and value of the child task, respectively.


Solution

  • Of course, the correct solution turned out to be the simplest one: call master as a function to execute it in the current process.

    r = master()
    r.get()      # [3, 7]
    r.collect()  # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
                 #  (<AsyncResult: #2>, 3),
                 #  (<AsyncResult: #3>, 7)]
    

    Instead of deferring the group startup code to a worker process, it is started in the current process. Since the group is entirely asynchronous, the behavior does not change, and the performance improves.