In luigi
I'm trying to set up a workflow that goes like this:
1) Parse data
2) Do calculations on parsed data
3) Tar calculated data together
These operations need to be done in order and I have a couple of workflows set up like this. However, even though the requiring is easily done between 1 and 2 (2 requiring 1), I don't want to explicitly have 3 require 2, otherwise I can't re-use the task in other workflows. So, how can I do this?
I know that using dynamic dependencies works, but its intended use is for when you don't know the dependency list ahead of time, whereas in this situation I do. It also requires me to make a Workflow
task that yields tasks 2 and 3 in order, instead of just scheduling them.
One possible solution I've tried is to make a super class that can take a task as a parameter, but unfortunately this doesn't work as classes cannot be parameters, only primitives + dates. So, what is the right way of making this work?
I've included the current method below:
class TaskOne(luigi.Task):
def output(self):
return luigi.LocalTarget("...")
def run(self):
with self.output().open('w') as out_file:
// Do parsing
class TaskTwo(luigi.Task):
def requires(self):
return TaskOne()
def output(self):
return luigi.LocalTarget(".../success.txt")
def run(self):
with self.input().open('r') as in_file:
// Do calculations
with self.output().open('w') as out_file:
out_file.write("1")
class TarTask(luigi.Task):
directory = luigi.Parameter()
def output(self):
return luigi.LocalTarget(directory+".tar.xz")
def run(self):
// Tar to temporary tar target then mv file to output location
class Workflow(luigi.Task):
def output(self):
return luigi.LocalTarget(".../wf_success.txt")
def run(self):
yield TaskTwo()
yield TarTask(directory)
with self.output().open('w') as out_file:
out_file.write("1")
So, I've come up with one way of confronting this problem. You can dynamically set the requires method of a task instance like so:
from types import MethodType
def sequence_tasks(tasks):
prev_task = None
for task in tasks:
def requires_method(self):
return self.prev_task
task.requires = MethodType(requires_method, task)
setattr(task, "prev_task", prev_task)
prev_task = task
return prev_task
This doesn't work if the tasks in your sequence do require other tasks however. For that, you would need to make a more sophisticated requires_method
that calls the old requires and appends/sets an attribute to add your new requirement task.