I have a graph of tasks, where each task may have on_success
and on_error
follow-up tasks. Success path is guaranteed to be acyclic (e.g. if all tasks run fine then no on_success
handlers link to tasks that have already ran) and guaranteed to have some final task (e.g. without on_success
follow-up handler). But on_error
tasks might link to any tasks, creating new flows (which themselves are positive-acyclic and finite). Example graph:
+ final_1
task_1
+ - $task_2
entrypoint
- + final_2
task_2
- $task_1
where $task_1
means that execution flow goes onto another branch and runs task_1
directly.
That sample graph can be described as following:
graph = {
'entrypoint': 'entry_1',
'tasks': {
'entry_1': {
'callback': 'text_generator',
'on_success': 'task_1',
'on_error': 'task_2',
},
'task_1': {
'callback': 'text_modifier',
'on_success': 'final_1',
'on_error': 'task_2',
},
'task_2': {
'callback': 'text_modifier',
'on_success': 'final_2',
'on_error': 'task_1',
},
'final_1': {
'callback': 'echo_result',
'on_success': None,
'on_error': None,
},
'final_2': {
'callback': 'store_result',
'on_success': None,
'on_error': None,
},
}
}
Also, pipeline gets arbitrary input to pass through all the tasks
pipeline_input = {
'foo': 'bar'
}
Technically it can get into an infinite loop if both task_1
and task_2
always fail, but that's an OK case by design.
My current solution was to run single celery task and handle errors in my "tasks" (which are then just ran synchronously, blocking the celery task).
@celery.task
def task_runner():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
while task:
try:
# call current task
task_output = task.callback(task_input)
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
task_input = task_output
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
task_output = task_input
The code is somewhat simplified version of mine so might contain obvious errors but it's here to convey the basic idea.
Basically, at each point in time during execution I have a straightforward success-flow — entrypoint -> task_1 -> final_1
(no forks and no loops). Once I encounter error I just switch to different success-flow if it is defined — entrypoint -> task_1 (error!) -> task_2 -> final_2
.
What I want now is to run those tasks as Celery tasks, not just synchronous methods within single Celery task. But I cannot think of a way to define new chained flows of any task fails (that try/except block).
I could build a chain
of valid tasks for entrypoint -> task_1 -> final_1
and do an apply_async():
@celery.task
def task_chainer():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
success_chain = []
while task:
try:
success_chain.append(task.callback.s(task_input))
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
success_chain.apply_async()
but that try/except wouldn't work since it's now all async and I have to use error callbacks. Due to those chains holding cyclic-potential for error cases I cannot define all chains beforehand and just apply them once.
So, my question is: how to redefine a chain of tasks once some task fails? Or should this be done in some other way? Maybe chain.unchain_tasks() might be of some use here?
I worked out a solution to only queue single task once the currently running task finishes (either with success or failure), which looks similar to this:
@celery.task
def task_chainer():
# Enqueueing entrypoint task
run_task.apply_async(kwargs={'graph': graph, 'task_id': graph.get('entrypoint'), 'task_input': pipeline_input}, ignore_result=True)
@celery.task
def run_task(graph, task_id: int, task_input):
current_task = graph.tasks[task_id]
followup_task_id = None
try:
task_callable = _get_callable(current_task.callable)
output = task_callable(task_input)
followup_task_id = graph.tasks[current_task.on_success] if current_task.on_success else None
except Exception as error:
logging.exception(error)
output = task_input
followup_task_id = graph.tasks[current_task.on_error] if current_task.on_error else None
if followup_task_id:
run_task.apply_async(graph, followup_task_id, output)
return
# If no followup defined, then we're done with our graph and may want to update the overall graph status as DONE