Search code examples
pythoncelery

chain a Celery task that returns a list into a group in middle of chain


This question is the same as this one: How to chain a Celery task that returns a list into a group? except that I need this to happen in the middle of a chain, and the accepted solution only works if the intermediate task is the final "link" in the chain.

Here is the same example slightly modified that reproduces the issue:

from random import random

from celery import 

@app.task
def get_list(amount):
    return [i for i in range(amount)]

@app.task
def process_item(item):
    return [f'id-{item}', random() > .5]

@app.task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

@app.task
def handle_results(results):
    for result in results:
        if result[1] == None:
            continue

        return result[1] # return the first True value

def foo():
    return chain(
        get_list.s(10),
        dmap.s(process_item.s()),
        handle_results.s() # <-- if I add this, it fails
    )

# in a terminal, or somewhere
foo()()

The error I'm getting is this:

File "/usr/local/Cellar/python/3.7.4_1/Frameworks/Python.framework/Versions/3.7/lib/python3.7/json/encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' kombu.exceptions.EncodeError: Object of type GroupResult is not JSON serializable

That is the return value of dmap after all.. and no it can't be serialized.. but note that if I did this:

>>> lst = [i for i in range(amount)]
>>> chain(group(process_item.s(i) for i in lst), handle_results.s())

then that would work. I'm confused on what actually needs to be passed from one member of the chain to the other.. as the result of group(...) is:

>>> from app.manager_tasks import process_item
>>> group(process_item.s(e) for e in [1, 2, 3, 4])
group([app.manager_tasks.process_item(1), process_item(2), process_item(3), process_item(4)])
>>> group(process_item.s(e) for e in [1, 2, 3, 4]).delay()
<GroupResult: 07c9be1a-b3e3-4da2-af54-7177f3d91d0f [cf777f54-4763-46bd-a405-2c1993ddbf66, 103298fc-8f1f-4183-ba45-670224fcd319, 3ad87c2c-7b64-4309-a61b-e53ae17302b9, bf2766a3-662a-415d-a35b-037a0476f4a4]>

which is a GroupResult itself (with delay called), otherwise just a group. Since the dmap is a signature itself, I'm guessing that's why delay() needs to be called inside of it for chain.. 🤔

If I invoke the result as done in the other stackoverflow (same link as first) examples I'm left with a GroupResult, which only succeeds if it's the last member of the chain ((), .delay(), .apply_async()). If I call .get() on the GroupResult to get something serializable, then I get the following error: RuntimeError: Never call result.get() within a task! Which presents me with a conundrum; how can I accomplish this?

Pretty stumped on this one.. but I'm also new to celery. Really appreciate any advice on how I could/should solve this!

A bit more background, I intend to use this chain in repeat as part of another chain which sits at the top level specifying stages in a pipeline.


Solution

  • As @DejanLekic mentioned, I should have been using a chord. This would solve the above:

    def foo():
        return chord(
            get_list.s(10),
            dmap.s(process_item.s())
        )(handle_results.s())
    

    I had wanted this to still yet be part of a chain, but it doesn't look like that is supported right now.


    The below is less related to the question, though possibly useful to some.

    Using the solution from that github issue thread, I can still do what I need (after the primary question was figured out) by having nested chords and chains. Not the cleanest, but it works.. would look like this:

    def foo():
        return chord(
            get_list.s(10),
            dmap.s(process_item.s())
        )(chain(handle_results.s(), log_stuff.s()))