Search code examples
pythonluigi

Is it possible to reuse a task in two different spots in a Luigi pipeline?


I am developing a Luigi pipeline and am running into a major problem. There doesn't seem to be any way to reuse tasks in the same pipeline. To illustrate what I am after, consider the following workflow:

Collect_Data → Clean_data → Task_on_data_A → Task_on_data_B ↳ Sample_data → Task_on_data_A → Task_on_data_B

I perform the same operations on Clean_data and Sample_data. I want to do something like this in Luigi while following DRY principles. Right now it seems like I have to literally copy and paste my code for tasks A and B to do this.

Airflow has a way to deal with situations like this. Does Luigi?


Solution

  • I think you should draw the arrows as dependency arrows going the other way. It helps to think about it this way since the execution path is determined by the requires methods. In your scenario, it is very simple because you just have a TaskA that has a conditional dependency.

    You just need to add one more task at the start that depends on two TaskBs and an indicator of whether the taskb is to be run on the sample

    Pseudocode:

    ParentTask
      def requires():
         return [TaskB(sample_type=True), 
                 TaskB(sample_type=False)]
    
    TaskB
      def requires():
        return TaskA(sample_type)
    
    TaskA
      def requires():
        if sample_type:
          return SampleTask()
        else:
          return CleanTask()
    
    SampleTask
      def requires():
        return CleanTask()
    
    CleanTask
      def requires()
        return CollectData()