Search code examples
pythonluigi

How to run a task after another is complete without requiring it


In luigi I'm trying to set up a workflow that goes like this:

1) Parse data

2) Do calculations on parsed data

3) Tar calculated data together

These operations need to be done in order and I have a couple of workflows set up like this. However, even though the requiring is easily done between 1 and 2 (2 requiring 1), I don't want to explicitly have 3 require 2, otherwise I can't re-use the task in other workflows. So, how can I do this?

I know that using dynamic dependencies works, but its intended use is for when you don't know the dependency list ahead of time, whereas in this situation I do. It also requires me to make a Workflow task that yields tasks 2 and 3 in order, instead of just scheduling them.

One possible solution I've tried is to make a super class that can take a task as a parameter, but unfortunately this doesn't work as classes cannot be parameters, only primitives + dates. So, what is the right way of making this work?

I've included the current method below:

class TaskOne(luigi.Task):
  def output(self):
    return luigi.LocalTarget("...")

  def run(self):
    with self.output().open('w') as out_file:
      // Do parsing

class TaskTwo(luigi.Task):
  def requires(self):
    return TaskOne()

  def output(self):
    return luigi.LocalTarget(".../success.txt")

  def run(self):
    with self.input().open('r') as in_file:
      // Do calculations
    with self.output().open('w') as out_file:
      out_file.write("1")

class TarTask(luigi.Task):
  directory = luigi.Parameter()

  def output(self):
    return luigi.LocalTarget(directory+".tar.xz")

  def run(self):
    // Tar to temporary tar target then mv file to output location

class Workflow(luigi.Task):
  def output(self):
    return luigi.LocalTarget(".../wf_success.txt")

  def run(self):
    yield TaskTwo()
    yield TarTask(directory)
    with self.output().open('w') as out_file:
      out_file.write("1")

Solution

  • So, I've come up with one way of confronting this problem. You can dynamically set the requires method of a task instance like so:

    from types import MethodType
    
    
    def sequence_tasks(tasks):
        prev_task = None
        for task in tasks:
    
            def requires_method(self):
                return self.prev_task
    
            task.requires = MethodType(requires_method, task)
            setattr(task, "prev_task", prev_task)
            prev_task = task
        return prev_task
    

    This doesn't work if the tasks in your sequence do require other tasks however. For that, you would need to make a more sophisticated requires_method that calls the old requires and appends/sets an attribute to add your new requirement task.