Search code examples
djangodjango-rest-frameworkcelery

Ensure Sequential Task Execution with Celery in Django REST Framework?


I have a Django REST Framework project where I need to call two Celery tasks sequentially. Specifically, I need to call first_function and then after some operations, call the second_function, ensuring that second_function runs only after first_function has completed.

# tasks.py
from celery import shared_task

@shared_task
def first_function():
    # Logic for the first function

@shared_task
def second_function():
    # Logic for the second function

This is my view.py

first_function.delay()

# some codes ....
# now i need to call my second_function.delay()
   
second_function.delay()

I can't call chain(first_function.s(), second_function.s()).delay() because I need to call first_function at the beginning of my code. In case of an exception, first_function should run independently. However, if everything works correctly, I want to call second_function only after ensuring that first_function has completed.

My concerns are:

  • If multiple requests are made to the view simultaneously, I want the second_function correctly wait for the corresponding first_function to complete.
  • I'm a little confused about how to ensure that second_function runs after the specific first_function related to the same request?(Note: I can't add sleep or any blocking code in the middle of my code.)

Any guidance or best practices for handling this scenario with Celery would be greatly appreciated!


Solution

  • You can read the status of first_task at the beginning of second_task and wait till the first_task state is 'SUCCESS'.

    from celery.result import AsyncResult
    
    first_task = first_function.delay()
    
    # some codes ....
    
    while True:
        # Poll the first task until it's completed
        result = AsyncResult(first_task.id)
        if result.state == 'SUCCESS':
            # If the first task is successful, call the second task
            second_task = second_function.delay(result.result)
        elif result.state in ['FAILURE', 'REVOKED']:
            break
        
        # Sleep for a short while before checking again
        time.sleep(2)
    

    If you don't want to use sleep then you can introduce a 3rd task to avoid blocking main thread

    from celery import shared_task
    from celery.result import AsyncResult
    
    @shared_task
    def first_function():
        # Your first function logic here
        pass
    
    @shared_task
    def second_function(result):
        # Your second function logic here
        # 'result' is the result of first_function
        pass
    
    @shared_task(bind=True)
    def monitor_tasks(self, first_task_id):
        # Monitor the first task
        result = AsyncResult(first_task_id)
    
        if result.state == 'SUCCESS':
            # If the first task is successful, call the second task
            second_function.delay(result.result)
        elif result.state in ['FAILURE', 'REVOKED']:
            # If the first task failed or was revoked, stop monitoring
            print("First task failed or was revoked. Not triggering the second task.")
        else:
            # Retry the monitoring task if the first task is not finished yet
            self.retry(countdown=2, max_retries=None)
    
    
    # Start the first function asynchronously
    first_task = first_function.delay()
    
    # some codes ....
    
    # Start monitoring task
    monitor_tasks.delay(first_task.id)