Search code examples
pythonpython-3.xluigi

How do I represent subflows in Python Luigi task manager?


Suppose I have flow A->B. Users of this flow require B. Suppose I have a task C which requires B but I want D to happen before flow A->B. How do I do this in Luigi?


Solution

  • One solution is to pass C's requirements into subflow A->B:

    import luigi
    
    class A(luigi.Task):
      precedes = luigi.Parameter()
    
      def requires(self):
         return self.precedes
    
    class B(luigi.Task):
      precedes = luigi.Parameter()
    
      def requires(self):
         return A(precedes = self.precedes)
    
    class D(luigi.Task):
      pass
    
    class C(luigi.Task):
      def requires(self):
          return B(precedes = D())