I am trying to achieve a rather simple Celery workflow, where I receive the result of multiple parallel invocations of the same task as a tuple (or list).
@app.task
def add(x, y):
return x + y
@app.task
def master():
return group(add.s(1, 2), add.s(3, 4))()
From this, I would like to retrieve (3, 7)
in a generic way, that is, in a way that does not depend on the workflow itself. I am looking for some sort of "reduce async result graph to primitives" operation. I have experimented with the following (I have replaced result IDs with #num
for brevity)
r = master.delay()
r.get() # <GroupResult: #1 [#2, #3]>
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
# (<GroupResult: #1 [#2, #3]>, [3, 7])
# (<AsyncResult: #2>, 3),
# (<GroupResult: #3>, 7)]
r.get()
returns a wrapper around two AsyncResult
IDs, so I would have to process each one recursively. r.collect()
is close, but it recurses too deeply.
I could do something like
r.children[0].get()
but this is not generic, since it explicitly depends on the structure of the result graph. Also, I could iterate through r.collect()
until I find a tuple whose value is not an instance of ResultBase
, like
next(value for _, value in r.collect() if not isinstance(value, ResultBase))
but I am not sure if this is actually correct in all cases, and I was hoping that there is a more elegant way to do this.
If there is a way of restructuring the master
task to make retrieving the results easier, I am open to it, just as long as the subtasks are launched in parallel. Any suggestions would be appreciated. Thank you in advance.
EDIT A related issue is that if I want to retrieve the task results in a non-blocking way (for example, by manually polling r.status
before calling r.get()
or r.collect()
, I cannot simply do this
r = master.delay()
# some time later...
if r.status in READY_STATES:
r.get()
because r
is an AsyncResult
which resolves to a GroupResult
, i.e. it completes before the GroupResult
or its children. Is there a way to invoke the group in a way that "skips" the top-level AsyncResult
? This would solve both issues, since r.status
and r.get()
would reflect the status and value of the child task, respectively.
Of course, the correct solution turned out to be the simplest one: call master
as a function to execute it in the current process.
r = master()
r.get() # [3, 7]
r.collect() # [(<GroupResult: #1 [#2, #3]>, [3, 7]),
# (<AsyncResult: #2>, 3),
# (<AsyncResult: #3>, 7)]
Instead of deferring the group
startup code to a worker process, it is started in the current process. Since the group
is entirely asynchronous, the behavior does not change, and the performance improves.