Search code examples
pythoncelerydjango-celery

celery - chaining groups and subtasks. -> out of order execution


When I have something like the following

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

The intuitive interpretation is that task3 should only execute after all tasks in group 2 have finished.

In reality, task 3 executes while group1 has started but hasn't completed yet.

What am i doing wrong?


Solution

  • So as it turns out, in celery you cannot chain two groups together.
    I suspect this is because groups chained with tasks automatically become a chord
    --> Celery docs: http://docs.celeryproject.org/en/latest/userguide/canvas.html

    Chaining a group together with another task will automatically upgrade it to be a chord:

    Groups return a parent task. When chaining two groups together, I suspect that when the first group completes, the chord starts the callback "task". I suspect this "task" is actually the "parent task" of the second group. I further suspect that this parent task completes as soon as it finishes kicking off all the subtasks within the group and as a result the next item after the 2nd group is executed.

    To demonstrate this here is some sample code. You'll need to already have a running celery instance.

    # celery_experiment.py
    
    from celery import task, group, chain, chord
    from celery.signals import task_sent, task_postrun, task_prerun
    
    import time
    import logging
    
    import random
    random.seed()
    
    logging.basicConfig(level=logging.DEBUG)
    
    ### HANDLERS ###    
    @task_prerun.connect()
    def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
        try:
            logging.info('[%s] starting' % kwargs['id'])
        except KeyError:
            pass
    
    @task_postrun.connect()
    def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
        try:    
            logging.info('[%s] finished' % kwargs['id'])
        except KeyError:
            pass
    
    
    def random_sleep(id):
        slp = random.randint(1, 3)
        logging.info('[%s] sleep for %ssecs' % (id, slp))
        time.sleep(slp)
    
    @task()
    def thing(id):
        logging.info('[%s] begin' % id)
        random_sleep(id)
        logging.info('[%s] end' % id)
    
    
    def exec_exp():
        st = thing.si(id='st')
        st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
        st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
        st2 = thing.si(id='st2')
        st3 = thing.si(id='st3')
        st4 = thing.si(id='st4')
    
        grp1 = group(st_arr)
        grp2 = group(st_arr2)
    
        # chn can chain two groups together because they are seperated by a single subtask
        chn = (st | grp1 | st2 | grp2 | st3 | st4)
    
        # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
        #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)
    
        r = chn()
        #r2 = chn2()