I'm new to Celery and want to implement a basic example of a chain containing four tasks. The first task passes its return value to the next task in the chain, and so on. While running Celery worker, I got the following error:
ERROR/ForkPoolWorker-4] Task run-chain[...] raised unexpected: TypeError("unsupported operand type(s) for |: 'chain' and 'AsyncResult'")
Here is my simple chain with the tasks:
from celery import chain, signature
from celery import shared_task
@shared_task(name = 'add')
def add(x, y):
return x + y
@shared_task(name = 'substract')
def substract(x, y):
return x - y
@shared_task(name = 'multiply')
def multiply(x, y):
return x * y
@shared_task(name = 'divide')
def divide(x, y):
return x / y
@shared_task(name = 'run-chain')
def run_chain(x, y):
canvas = chain(
add.s(x, y).apply_async(),
substract.s(x, y).apply_async(),
multiply.s(x, y).apply_async(),
divide.s(x, y).apply_async()
)
return canvas
and this is my celery.py
file:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myfirsttask.settings')
app = Celery()
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.beat_schedule = {
'run-every-five-seconds': {
'task': 'run-chain',
'schedule': 5.0,
'args': (2, 3)
}
}
Your help is much appreciated!
The chain
call isn't supposed to be called on the result of the task, it's to be called on the task. So do this, and then run apply_async()
on the whole chain:
@shared_task(name = 'run-chain')
def run_chain(x, y):
canvas = chain(
add.s(x, y),
substract.s(x, y),
multiply.s(x, y),
divide.s(x, y)
).apply_async()
return canvas
This shouldn't work, because the result of the previous task is passed as the first argument to the next task. Maybe you want to do something like this:
@shared_task(name = 'run-chain')
def run_chain(x, y):
canvas = chain(
add.s(x, y),
substract.s(y),
multiply.s(y),
divide.s(y)
).apply_async()
return canvas
Or something of the sort.