I am trying to implement some celery chains/groups/chords using django 3.0, celery 4.3, redis and python 3.6. From the documentation, I thought tasks in a group run in parallel, and tasks in a chain run sequentially, but I am not observing that behavior.
I have this chain of task signatures:
transaction.on_commit(lambda: chain(clean, group(chain(faces_1, faces_2), folder, hashes), change_state_task.si(document_id, 'ready')).delay())
where I expect the change_state_task
to wait for all the other tasks to complete before it starts. This did not work, in that the change_state_task
started before hashes
finished. All the tasks ran and completed successfully.
I then tried this chain:
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.si(document_id, 'ready')).delay())
where all the signatures are in a long chain. However, the change_state_task
is still starting before the hashes
task finishes.
I even tried using change_state_task.s(document_id, 'ready')
(replaced si with s), thinking that the change_state_task
could not start without the output of the hashes
task. But it still starts before hashes ends.
I then tried using task.s
versus task.si
for all the task signatures, and the change_state_task
still started before the hashes
task ended.
What am I missing?
Thanks!
Mark
PS Apologies for not being clear on my task signatures. I have a long Python method that figures out what tasks have to be run. It looks something like this:
@app.task(bind=True)
def noop(self, message):
# Task accepts a string and does nothing
logger.debug(message)
return True
def figure_out_which_tasks_to_fire(document_id):
clean = noop.si("replaces clean_document_image task")
faces_1 = noop.si("replaces find_faces_task task")
faces_2 = noop.si("replaces recognize_face_task task")
folder = noop.si("replaces update_source_folder task")
hashes = noop.si("replaces compute_image_descriptor_task task")
if clean_needed:
clean = clean_document_image.s(document_id, key, value)
if faces_needed:
faces_1 = find_faces_task.s(document_id)
faces_2 = recognize_face_task.s(document_id)
if folder_needed:
folder = update_source_folder.s(document_id, file_name, source_folder)
if hashes_needed:
hashes = compute_image_descriptor_task.s(settings.DEFAULT_SIMILAR_IMAGE, document_id, hash_name)
# Finished figuring out what needs to be done, so do the tasks
# and then update the state of the document.
transaction.on_commit(lambda: chain(clean, faces_1, faces_2, folder, hashes, change_state_task.s(document_id, 'ready')).delay())
I need the transaction.on_commit
because all the tasks read and write the Django app's backend mysql database.
I've had issues with celery automatically transforming chained groups into chord
s. Try using the chord()
function specifically.