Search code examples
pythoncelery

Running a group of chains


I'm attempting to run a group of chains in Celery. I create a set of chains:

chains = [celery.chain(a.s(i), b.s(), c.s()) for i in items]

return wrap this in a group:

group = celery.group(*chains)

The expectation here is that Celery will then schedule each full chain to run as an independent task. And indeed, logically, that's what appears to happen. But there are two problems:

  1. If the number of chains is large, nothing seems to run. No errors in the Celery or rabbitmq consoles. (Yes, using rabbitmq.)

  2. Celery appears to execute the first task of each chain across all tasks in the group before moving to the second task of each chain. (That is, it appears to unwrap the chains into a group of task as, task bs, and then task cs. They are still linked to their corresponding chain entries, but this introduces a delay when certain task as complete much more quickly than others.

Any ideas what's going on?


Solution

  • I've written code to test your case with memory backend and just one process (it's in the bottom). celery -A module-name --loglevel=info -c 10

    1. Barrier-like behaviour: That does not seem like a problem. If you apply different sleeps, or execute a lot of tasks with high parallelism, you'll see, that b and c tasks are executed in parallel with a

    2. Failing on big chains: when I try to create 1000000 chains the code actually silently fails on chain creating, so it seems more like python memory problem. 100000-length chais are fine


    The code

    from celery import Celery, chain, group
    from pprint import pprint
    import threading
    from time import sleep
    
    app = Celery('chaintext')
    app.conf.update(
        BROKER_BACKEND = 'memory',
        CELERY_RESULT_BACKEND = 'cache',
        CELERY_CACHE_BACKEND  = 'memory',
        CELERY_EAGER_PROPAGATES_EXCEPTIONS = True,
        CELERY_TASK_SERIALIZER='json',
        CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
        CELERY_ENABLE_UTC=True,
        CELERYD_POOL = 'celery.concurrency.threads:TaskPool'
    )
    
    @app.task
    def a(i):
        result = 'A %s' % i
        sleep((i%3)/ 10.0)
        pprint(result)
        return result
        
        
    @app.task
    def b(self,i):
        result = 'B %s' % i    
        sleep((i%3)/  10.0)
        pprint(result)
        return result
        
    @app.task
    def c(self,i):
        result = 'C %s' % i
        sleep((i%3)/  10.0)
        pprint(result)
        return result
        
    def main():
        print "MAIN"
        import time
        time.sleep(5)
        print "STARTING"
        chains = [chain(a.s(i), b.s(i), c.s(i)) for i in range(1000000)]
        print "CREATED CHAINS"
        g = group(*chains)
        print "CREATED GROUP"
        result = g.apply_async()
        print "QUEUED GROUP"
        
        print result.get()
        
    t1 = threading.Thread(target=main)
    t1.start()