Search code examples
pythonluigi

Can Luigi propagate exception or return any result?


I am using Luigi to launch some pipeline. Let's take a simple exemple

task = myTask()
w = Worker(scheduler=CentralPlannerScheduler(), worker_processes=1)
w.add(task)
w.run()

Now let's say that myTask is raising an exception during execution. All that I am able to have is a log from luigi showing the exception.

Is there any way that luigi could propagate it or at least return a failure status ?

I would then be able to make my programm react in function of that state.

Thanks.

EDIT I forgot to specify that luigi's outputs are targetting a database when I am storing the result. If an exception is raised, no result are stored but the exception is not propagated out a luigi. I was wondering if luigi have an option to have this.


Solution

  • From docs:

    Luigi has a built-in event system that allows you to register callbacks to events and trigger them from your own tasks. You can both hook into some pre-defined events and create your own. Each event handle is tied to a Task class and will be triggered only from that class or a subclass of it. This allows you to effortlessly subscribe to events only from a specific class (e.g. for hadoop jobs).

    Example:

    import luigi
    
    from my_tasks import MyTask
    
    
    @MyTask.event_handler(luigi.Event.FAILURE)
    def mourn_failure(task, exception):
        """Will be called directly after a failed execution
        of `run` on any MyTask subclass
        """
    
        do_something()
    
    
    luigi.run()
    

    Luigi has a lot of events you can choose from. You can also take a look at this tests in order to learn how to listen and react to other events.