Search code examples
luigi

How to use the output of an upstream task to drive the result of requires()?


In Luigi, I have a task where I want to dynamically generate a list of dependencies based on the output from another upstream task. For example:

class TaskA:
  param = IntParameter()

class TaskB:
  def main(self):
    pass
  def output(self):
    return [1,2,3,4]

class TaskC:
  def requires(self):
    return [TaskB()] + [TaskA(param=p) for p in TaskB().output()]

In summary, I am creating a set of TaskA dependencies in TaskC, based on the output from TaskB.

I have tried a few things, but it seems that Luigi gets confused because TaskB really needs to run before TaskC can return its list of dependencies. But obviously Luigi cannot run anything until it calls TaskC.requires()

Is there any way to make this work and accomplish what I am trying to do here?

In my real-life scenario, the implementations of these tasks are much more complex, but this is the gist of how the tasks are connected.


Solution

  • This is a great question! And Luigi actually provides the perfect solution for it and is covered here in the docs: https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies

    Basically, you will require TaskB and then yield to new tasks in the run function based on the output. Let me show you in an example:

    class TaskC:
      def requires(self):
        return TaskB()
    
      def run(self):
        yield [TaskA(param=p) for p in self.input()]