I'm attempting to run a group of chains in Celery. I create a set of chains:
chains = [celery.chain(a.s(i), b.s(), c.s()) for i in items]
return wrap this in a group:
group = celery.group(*chains)
The expectation here is that Celery will then schedule each full chain to run as an independent task. And indeed, logically, that's what appears to happen. But there are two problems:
If the number of chains is large, nothing seems to run. No errors in the Celery or rabbitmq consoles. (Yes, using rabbitmq.)
Celery appears to execute the first task of each chain across all tasks in the group before moving to the second task of each chain. (That is, it appears to unwrap the chains into a group of task a
s, task b
s, and then task c
s. They are still linked to their corresponding chain entries, but this introduces a delay when certain task a
s complete much more quickly than others.
Any ideas what's going on?
I've written code to test your case with memory backend and just one process (it's in the bottom).
celery -A module-name --loglevel=info -c 10
Barrier-like behaviour: That does not seem like a problem. If you apply different sleeps, or execute a lot of tasks with high parallelism, you'll see, that b
and c
tasks are executed in parallel with a
Failing on big chains: when I try to create 1000000 chains the code actually silently fails on chain creating, so it seems more like python memory problem. 100000-length chais are fine
from celery import Celery, chain, group
from pprint import pprint
import threading
from time import sleep
app = Celery('chaintext')
app.conf.update(
BROKER_BACKEND = 'memory',
CELERY_RESULT_BACKEND = 'cache',
CELERY_CACHE_BACKEND = 'memory',
CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_ENABLE_UTC=True,
CELERYD_POOL = 'celery.concurrency.threads:TaskPool'
)
@app.task
def a(i):
result = 'A %s' % i
sleep((i%3)/ 10.0)
pprint(result)
return result
@app.task
def b(self,i):
result = 'B %s' % i
sleep((i%3)/ 10.0)
pprint(result)
return result
@app.task
def c(self,i):
result = 'C %s' % i
sleep((i%3)/ 10.0)
pprint(result)
return result
def main():
print "MAIN"
import time
time.sleep(5)
print "STARTING"
chains = [chain(a.s(i), b.s(i), c.s(i)) for i in range(1000000)]
print "CREATED CHAINS"
g = group(*chains)
print "CREATED GROUP"
result = g.apply_async()
print "QUEUED GROUP"
print result.get()
t1 = threading.Thread(target=main)
t1.start()