I have a Django REST Framework project where I need to call two Celery tasks sequentially. Specifically, I need to call first_function
and then after some operations, call the second_function
, ensuring that second_function
runs only after first_function
has completed.
# tasks.py
from celery import shared_task
@shared_task
def first_function():
# Logic for the first function
@shared_task
def second_function():
# Logic for the second function
This is my view.py
first_function.delay()
# some codes ....
# now i need to call my second_function.delay()
second_function.delay()
I can't call chain(first_function.s(), second_function.s()).delay()
because I need to call first_function
at the beginning of my code. In case of an exception, first_function
should run independently. However, if everything works correctly, I want to call second_function
only after ensuring that first_function
has completed.
My concerns are:
second_function
correctly wait for the corresponding first_function
to complete.second_function
runs after the specific first_function
related to the same request?(Note: I can't add sleep or any blocking code in the middle of my code.)Any guidance or best practices for handling this scenario with Celery would be greatly appreciated!
You can read the status of first_task at the beginning of second_task and wait till the first_task state is 'SUCCESS'.
from celery.result import AsyncResult
first_task = first_function.delay()
# some codes ....
while True:
# Poll the first task until it's completed
result = AsyncResult(first_task.id)
if result.state == 'SUCCESS':
# If the first task is successful, call the second task
second_task = second_function.delay(result.result)
elif result.state in ['FAILURE', 'REVOKED']:
break
# Sleep for a short while before checking again
time.sleep(2)
If you don't want to use sleep then you can introduce a 3rd task to avoid blocking main thread
from celery import shared_task
from celery.result import AsyncResult
@shared_task
def first_function():
# Your first function logic here
pass
@shared_task
def second_function(result):
# Your second function logic here
# 'result' is the result of first_function
pass
@shared_task(bind=True)
def monitor_tasks(self, first_task_id):
# Monitor the first task
result = AsyncResult(first_task_id)
if result.state == 'SUCCESS':
# If the first task is successful, call the second task
second_function.delay(result.result)
elif result.state in ['FAILURE', 'REVOKED']:
# If the first task failed or was revoked, stop monitoring
print("First task failed or was revoked. Not triggering the second task.")
else:
# Retry the monitoring task if the first task is not finished yet
self.retry(countdown=2, max_retries=None)
# Start the first function asynchronously
first_task = first_function.delay()
# some codes ....
# Start monitoring task
monitor_tasks.delay(first_task.id)