I am developing a Luigi pipeline and am running into a major problem. There doesn't seem to be any way to reuse tasks in the same pipeline. To illustrate what I am after, consider the following workflow:
Collect_Data → Clean_data → Task_on_data_A → Task_on_data_B
↳ Sample_data → Task_on_data_A → Task_on_data_B
I perform the same operations on Clean_data
and Sample_data
. I want to do something like this in Luigi while following DRY principles. Right now it seems like I have to literally copy and paste my code for tasks A and B to do this.
Airflow has a way to deal with situations like this. Does Luigi?
I think you should draw the arrows as dependency arrows going the other way. It helps to think about it this way since the execution path is determined by the requires methods. In your scenario, it is very simple because you just have a TaskA that has a conditional dependency.
You just need to add one more task at the start that depends on two TaskBs and an indicator of whether the taskb is to be run on the sample
Pseudocode:
ParentTask
def requires():
return [TaskB(sample_type=True),
TaskB(sample_type=False)]
TaskB
def requires():
return TaskA(sample_type)
TaskA
def requires():
if sample_type:
return SampleTask()
else:
return CleanTask()
SampleTask
def requires():
return CleanTask()
CleanTask
def requires()
return CollectData()