Search code examples
pythonceleryworkflowchain

Design workflow in celery using chain and group


I am new in celery and I am trying to design a workflow in celery using Chain, Groups and Chord. Here is what I have done so far:

def __chainfileprocessing(config):
    filelist, src = get_files_for_processing(config)
    for fileattr in filelist:
        chain(  download_file.s(fileattr, src)
                ,importdata.s(fileattr, src)
                ,post_processing.s(fileattr, src)
             ).apply_async()

Current execution order:

  • All the download_file() tasks are getting executed
  • Then all import_data() tasks are getting excuted
  • Then all post_processing() task are getting executed

What I need:

Task should get executed in download_file() => import_data() => post_processing() order for each item in filelist.


Solution

  • Your code is doing what you said you want. The chain will start tasks for download_file, then import_date, then post_processing, in that order, for each item in filelist.

    What your code does is:

    • start a chain of tasks (download_file, then importdata, then post_processing) for file A asynchronously; this starts download_file for file A. When that completes, it will start the importdata task for file A. The apply_async returns immediately; it doesn't wait for any of the tasks to complete.
    • start a chain of tasks (download_file, then importdata, then post_processing) for file B asynchronously; this starts download_file for file B. When that completes, it will start the importdata task for file B. The download_file task for file A is likely running, but this call doesn't know that; it just adds a task to the queue.
    • etc

    At the end of the loop, what you've sent to celery is download_file for files A...n. As each download_file task completes, it will submit the next task in its chain.

    There are no dependencies between the tasks for different files; from the code you've posted, it doesn't look there needs to be (why should file 2 have to wait for file 1 to complete?)