Search code examples
pythoncelerycelery-task

Dynamically change task success-callback and error-callback within a chain in Celery


I have a graph of tasks, where each task may have on_success and on_error follow-up tasks. Success path is guaranteed to be acyclic (e.g. if all tasks run fine then no on_success handlers link to tasks that have already ran) and guaranteed to have some final task (e.g. without on_success follow-up handler). But on_error tasks might link to any tasks, creating new flows (which themselves are positive-acyclic and finite). Example graph:

                 + final_1
           task_1
          +      - $task_2
entrypoint
          -      + final_2
           task_2
                 - $task_1

where $task_1 means that execution flow goes onto another branch and runs task_1 directly.

That sample graph can be described as following:

graph = {
    'entrypoint': 'entry_1',
    'tasks': {
        'entry_1': {
            'callback': 'text_generator',
            'on_success': 'task_1',
            'on_error': 'task_2',
        },
        'task_1': {
            'callback': 'text_modifier',
            'on_success': 'final_1',
            'on_error': 'task_2',
        },
        'task_2': {
            'callback': 'text_modifier',
            'on_success': 'final_2',
            'on_error': 'task_1',
        },
        'final_1': {
            'callback': 'echo_result',
            'on_success': None,
            'on_error': None,
        },
        'final_2': {
            'callback': 'store_result',
            'on_success': None,
            'on_error': None,
        },
    }
}

Also, pipeline gets arbitrary input to pass through all the tasks

pipeline_input = {
    'foo': 'bar'
}

Technically it can get into an infinite loop if both task_1 and task_2 always fail, but that's an OK case by design.

My current solution was to run single celery task and handle errors in my "tasks" (which are then just ran synchronously, blocking the celery task).

@celery.task
def task_runner():
    task = graph.tasks[graph.entrypoint]
    task_input = some_input
    task_output = None
    while task:
        try:
            # call current task
            task_output = task.callback(task_input)

            # define successor and define input as output of current task
            task = graph.tasks[task.on_success]
            task_input = task_output
        except Exception:
            # define error-successor and define input as output of current task
            task = graph.tasks[task.on_error]
            task_output = task_input

The code is somewhat simplified version of mine so might contain obvious errors but it's here to convey the basic idea.

Basically, at each point in time during execution I have a straightforward success-flow — entrypoint -> task_1 -> final_1 (no forks and no loops). Once I encounter error I just switch to different success-flow if it is defined — entrypoint -> task_1 (error!) -> task_2 -> final_2.

What I want now is to run those tasks as Celery tasks, not just synchronous methods within single Celery task. But I cannot think of a way to define new chained flows of any task fails (that try/except block).

I could build a chain of valid tasks for entrypoint -> task_1 -> final_1 and do an apply_async():

@celery.task
def task_chainer():
    task = graph.tasks[graph.entrypoint]
    task_input = some_input
    task_output = None
    success_chain = []
    while task:
        try:
            success_chain.append(task.callback.s(task_input))

            # define successor and define input as output of current task
            task = graph.tasks[task.on_success]
        except Exception:
            # define error-successor and define input as output of current task
            task = graph.tasks[task.on_error]
    success_chain.apply_async()

but that try/except wouldn't work since it's now all async and I have to use error callbacks. Due to those chains holding cyclic-potential for error cases I cannot define all chains beforehand and just apply them once.

So, my question is: how to redefine a chain of tasks once some task fails? Or should this be done in some other way? Maybe chain.unchain_tasks() might be of some use here?


Solution

  • I worked out a solution to only queue single task once the currently running task finishes (either with success or failure), which looks similar to this:

    @celery.task
    def task_chainer():
        # Enqueueing entrypoint task
        run_task.apply_async(kwargs={'graph': graph, 'task_id': graph.get('entrypoint'), 'task_input': pipeline_input}, ignore_result=True)
    
    
    @celery.task
    def run_task(graph, task_id: int, task_input):
        current_task = graph.tasks[task_id]
    
        followup_task_id = None
        try:
            task_callable = _get_callable(current_task.callable)
            output = task_callable(task_input)
    
            followup_task_id = graph.tasks[current_task.on_success] if current_task.on_success else None
        except Exception as error:
            logging.exception(error)
            output = task_input
    
            followup_task_id = graph.tasks[current_task.on_error] if current_task.on_error else None
    
        if followup_task_id:
            run_task.apply_async(graph, followup_task_id, output)
            return
    
        # If no followup defined, then we're done with our graph and may want to update the overall graph status as DONE