Search code examples
pythonnetflix-metaflow

How to create nested branches in metaflow?


I am using metaflow to create a text processing pipeline as follows:-

                                 ___F------
                     ______ D---|          |  
                    |           |___G---|  |__>  
          ____B-----|                   |----->H
         |          |______E_________________> ^
      A -|                                     |
         |____C________________________________|

As per the documentation, branch allows to compute steps in parallel and it is used to compute (B, C), (D, E) and (F, G) in parallel. Finally all the branches are joined at H. Following is the code to implement this logic:-

from metaflow import FlowSpec, step

class TextProcessing(FlowSpec):

  @step
  def a(self):
    ....

    self.next(self.b, self.c)

  @step
  def c(self):
    result1 = {}

    ....

    self.next(self.join)

  @step
  def b(self):
    ....

    self.next(self.d, self.e)

  @step
  def e(self):
    result2 = []
    .....

    self.next(self.join)

  @step
  def d(self):
    ....

    self.next(self.f, self.g)

  @step
  def f(self):
    result3 = []
    ....

    self.next(self.join)

  @step
  def g(self):
    result4 = []
    .....

    self.next(self.join)


  @step
  def join(self, results):
    data = [results.c.result, results.e.result2, result.f.result3, result.g.result4]
    print(data)

    self.next(self.end)

  @step
  def end(self):
    pass

etl = TextProcessing()

On running python main.py run, I am getting following error:-

Metaflow 2.2.10 executing TextProcessing for user:ubuntu
Validating your flow...
    Validity checker found an issue on line 83:
    Step join seems like a join step (it takes an extra input argument) but an incorrect number of steps (c, e, f, g) lead to it. This join was expecting 2 incoming paths, starting from splitted step(s) f, g.

Can someone point out where I am going wrong?


Solution

  • After going through docs again carefully, I realised that I wasn't handling joins properly. As per docs for metaflow-2.2.10:-

    Note that you can nest branches arbitrarily, that is, you can branch inside a branch. Just remember to join all the branches that you create.

    which means every branch should be joined. In order to join values from branches, metaflow provides merge_artifacts utility function to aid in propagating unambiguous values.

    Since, there are three branches in the workflow, therefore added three join steps to merge results.

    Following changes worked for me:-

    from metaflow import FlowSpec, step
    
    class TextProcessing(FlowSpec):
    
      @step
      def a(self):
        ....
    
        self.next(self.b, self.c)
    
      @step
      def c(self):
        result1 = {}
    
        ....
    
        self.next(self.merge_3)
    
      @step
      def b(self):
        ....
    
        self.next(self.d, self.e)
    
      @step
      def e(self):
        result2 = []
        .....
    
        self.next(self.merge_2)
    
      @step
      def d(self):
        ....
    
        self.next(self.f, self.g)
    
      @step
      def f(self):
        result3 = []
        ....
    
        self.next(self.merge_1)
    
      @step
      def g(self):
        result4 = []
        .....
    
        self.next(self.merge_1)
    
      @step
      def merge_1(self, results):
        self.result = {
          'result4' : results.g.result4,
          'result3' : results.f.result3
        }
    
        self.next(self.merge_2)
    
      @step
      def merge_2(self, results):
        self.result = { 'result2' : results.e.result2, **results.merge_1.result }
        self.merge_artifacts(results, include=['result'])
        self.next(self.merge_3)
    
      @step
      def merge_3(self, results):
        self.result = { 'c' : results.c.result1, **results.merge_2.result }
        self.merge_artifacts(results, include=['result'])
        self.next(self.end)
    
      @step
      def end(self):
        print(self.result)
    
    etl = TextProcessing()