Search code examples
celerycelery-task

Celery chain - if any tasks fail, do x, else y


I'm just getting into Celery chains in my Django project. I have the following function:

def orchestrate_tasks_for_account(account_id):

    # Get the account, set status to 'SYNC' until the chain is complete
    account = Account.objects.get(id=account_id)
    account.status = "SYNC"
    account.save()

    chain = task1.s(account_id) | task2.s() | task3.s()
    chain()

    # if any of the tasks in the chain failed, set account.status = 'ERROR'
    # else set the account.status = 'OK'

The chain works as expected, but I'm not sure how to take feedback from the chain and update the account based on the results

In other words, I'd like to set the account status to 'ERROR' if any of the tasks in the chain fail, otherwise I'd like to set the account status to 'OK'

I'm confused by the Celery documentation on how to handle an error with an if/else like I've commented in the last two lines above.

Does anyone have experience with this?


Solution

  • Ok - here's what I came up with

    I've leveraged the waiting library in this solution

    from celery import chain
    from waiting import wait
    
    
    def orchestrate_tasks_for_account(account_id):
    
        account = Account.objects.get(id=account_id)
        account.status = "SYNC"
        account.save()
    
        job = chain(
            task1.s(account_id),
            task2.s(),
            task3.s()
            )
        result = job.apply_async()
    
        wait(
            lambda: result.ready(), # when async job is completed...
            timeout_seconds=1800, # wait 1800 seconds (30 minutes)
            waiting_for="task orchestration to complete"
            )
    
        if result.successful():
            account.status = 'OK'
        else:
            account.status = 'ERROR'
    
        account.save()
    

    I am open to suggestions to make this better!