Search code examples
pythoncelery

Pass task results from preceding tasks celery


Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.

Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.

from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result


MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)

@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
    logger.info('reverse order '.format(text))
    return {"reversename": str(text[::-1])}

@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
    logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
    return {"addition": str(a+b)}

@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
    logger.info('Name --> a : {0}'.format(a))
    return {"name": str(a)}


@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
    resultchain = chain([
        group([
            signature(
                add,
                args=(a,b),
                queue=MyQUEUE
            ),
            signature(
                myname,
                args=(n),
                queue=MyQUEUE
            )
        ]),
        signature
        (
            reverse,
            args=(-------),
            queue=MyQUEUE
        )
    ]).apply_async()

    with allow_join_result():
        results = resultchain.join()
    return results

Solution

  • First and most important, if you are gonna use chain, group, starmap or another kind of task workflow, tasks which results will be used on the future need to be set with ignore_result=False or omit the argument (default value is False). Needed to store the value, at least on myname and add

    @app.task(queue=MyQUEUE)
    def add(a,b):
        logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
        return {"addition": str(a+b)}
    
    @app.task(queue=MyQUEUE)
    def myname(a):
        logger.info('Name --> a : {0}'.format(a))
        return {"name": str(a)}
    
    

    Now, for reverse to obtain the results in the group of add and myname, you need to adjust reverse to handle the group result (a list of the results).

    For a chain the results of a task will be used as the first argument of the next task, in this case the group results will be injected in the first value of the reverse task as [{'addition': ...}, {'name': ...}], with that you can access the correct value.

    @app.task(queue=MyQUEUE)
    def reverse(group_data):
        # group_data value: [{'addition': '3'}, {'name': 'VALUE'}]
        text = group_data[1]['name']
        logger.info('reverse order '.format(text))
        return {"reversename": str(text[::-1])}
    

    Finally if you only want to reverse the result of myname, you have to chain only myname and reverse.

    resultchain = chain([
        signature(myname, args=(n,)),
        signature(reverse)
    ]).apply_async()