Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.
Here is my code. I want to pass the task myname result pass to be the task reverse in the signature as an argument.
from app import app
from app import app
from time import sleep
from celery.utils.log import get_task_logger
import os
from celery import signature, chain, group, chord
from celery.result import allow_join_result
MyQUEUE = os.getenv("SCANS_QUEUE")
logger = get_task_logger(__name__)
@app.task(queue=MyQUEUE, ignore_result=True)
def reverse(text):
logger.info('reverse order '.format(text))
return {"reversename": str(text[::-1])}
@app.task(queue=MyQUEUE, ignore_result=True)
def add(a,b):
logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
return {"addition": str(a+b)}
@app.task(queue=MyQUEUE, ignore_result=True)
def myname(a):
logger.info('Name --> a : {0}'.format(a))
return {"name": str(a)}
@app.task(queue=MyQUEUE, ignore_result=True)
def run_pipeline(a,b,n):
resultchain = chain([
group([
signature(
add,
args=(a,b),
queue=MyQUEUE
),
signature(
myname,
args=(n),
queue=MyQUEUE
)
]),
signature
(
reverse,
args=(-------),
queue=MyQUEUE
)
]).apply_async()
with allow_join_result():
results = resultchain.join()
return results
First and most important, if you are gonna use chain
, group
, starmap
or another kind of task workflow, tasks which results will be used on the future need to be set with ignore_result=False
or omit the argument (default value is False). Needed to store the value, at least on myname
and add
@app.task(queue=MyQUEUE)
def add(a,b):
logger.info('Addition --> a : {0} & b : {1} '.format(a,b))
return {"addition": str(a+b)}
@app.task(queue=MyQUEUE)
def myname(a):
logger.info('Name --> a : {0}'.format(a))
return {"name": str(a)}
Now, for reverse
to obtain the results in the group
of add
and myname
, you need to adjust reverse
to handle the group result (a list of the results).
For a chain
the results of a task will be used as the first argument of the next task, in this case the group results will be injected in the first value of the reverse task as [{'addition': ...}, {'name': ...}]
, with that you can access the correct value.
@app.task(queue=MyQUEUE)
def reverse(group_data):
# group_data value: [{'addition': '3'}, {'name': 'VALUE'}]
text = group_data[1]['name']
logger.info('reverse order '.format(text))
return {"reversename": str(text[::-1])}
Finally if you only want to reverse the result of myname
, you have to chain only myname
and reverse
.
resultchain = chain([
signature(myname, args=(n,)),
signature(reverse)
]).apply_async()