I am using Luigi to launch some pipeline. Let's take a simple exemple
task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()
Now let's say that myTask
is raising an exception during execution. All that I am able to have is a log from luigi showing the exception.
Is there any way that luigi could propagate it or at least return a failure
status ?
I would then be able to make my programm react in function of that state.
Thanks.
EDIT I forgot to specify that luigi's outputs are targetting a database when I am storing the result. If an exception is raised, no result are stored but the exception is not propagated out a luigi. I was wondering if luigi have an option to have this.
From docs:
Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).
Example:
import luigi
from my_tasks import MyTask
@MyTask.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
"""Will be called directly after a failed execution
of `run` on any MyTask subclass
"""
do_something()
luigi.run()
Luigi has a lot of events you can choose from. You can also take a look at this tests in order to learn how to listen and react to other events.