I am new in celery and I am trying to design a workflow in celery using Chain, Groups and Chord. Here is what I have done so far:
def __chainfileprocessing(config):
filelist, src = get_files_for_processing(config)
for fileattr in filelist:
chain( download_file.s(fileattr, src)
,importdata.s(fileattr, src)
,post_processing.s(fileattr, src)
).apply_async()
Current execution order:
What I need:
Task should get executed in download_file() => import_data() => post_processing()
order for each item in filelist.
Your code is doing what you said you want. The chain
will start tasks for download_file
, then import_date
, then post_processing
, in that order, for each item in filelist.
What your code does is:
download_file
, then importdata
, then post_processing
) for file A asynchronously; this starts download_file
for file A. When that completes, it will start the importdata
task for file A. The apply_async
returns immediately; it doesn't wait for any of the tasks to complete.download_file
, then importdata
, then post_processing
) for file B asynchronously; this starts download_file
for file B. When that completes, it will start the importdata
task for file B. The download_file
task for file A is likely running, but this call doesn't know that; it just adds a task to the queue.At the end of the loop, what you've sent to celery is download_file
for files A...n. As each download_file
task completes, it will submit the next task in its chain.
There are no dependencies between the tasks for different files; from the code you've posted, it doesn't look there needs to be (why should file 2 have to wait for file 1 to complete?)