Search code examples
pythonkedro

Dynamic instance of pipeline execution based on dataset partition/iterator logic


Not sure if this is possible or not, but this is what I am trying to do: -

I want to extract out portions (steps) of a function as individual nodes (ok so far), but the catch is I have an iterator on top of steps, which is dependent on some logic on dataset i.e. repeating the same operation (which are independent) on logical partitions of a dataset.

Example code

def single_node(list_of_numbers):
   modified_list = [] # to store all output
   for x in list_of_numbers: # iteration logic
      x+=1 # Step 1
      x=str(x) # Step 2
      x+="_suffix" # Step 3
      modified_list.append(x) # append to final output
   return modified_list # return

Context

  1. In the provided example, suppose currently I have a single node which performs all of the steps.
  2. So the current pipeline has one node which takes 1 input and returns 1 output.
  3. As the complexity of my steps increases, I want to expose them as individual nodes. So I create another pipeline with these 3 steps as individual nodes and connected them together. (their input and output)
  4. But my overall requirement is unchanged, I want to iterate over all values in list_of_numbers, and for each element in this list I want to call this new pipeline. Finally I want to merge the output of all run's and generate one output.

Seems somewhat similar to dynamic graph (multiple dynamic instance of a pipeline) which expands based on the dataset.

Additional points to consider,

  1. My input is a single file. Say I do the partition of dataset based on some logic defined as a node. So this node could have multiple outputs. (the exact count totally depends on the dataset, here the size of list)
  2. For each output of the data iterator node, I need to "spawn" one pipeline.
  3. Finally, merge the outputs of all "spawned" pipelines. (this logic could again be defined in a merge node with multiple dynamic inputs).

Is there a way to do this? Thank you!


Solution

  • This looks like the PartitionedDataSet or IncrementalDataSet might be of use to you.

    They allow you to segregate your similar data into separate chunks, determined by files, and repeat operations on those chunks as you see fit.

    So, rather than kick of x pipelines containing y nodes, you would have one pipeline that contains y nodes which processes x chunks of your data.

    More on IncrementalDataSet in this video: https://www.youtube.com/watch?v=v7JSSiYgqpg

    # nodes.py
    
    from typing import Any, Dict, Callable
    
    def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):
      # Apply your function to the dictionary mapping
      return {k: fun(v) for k, v in dictionary.items()}
    
    def node_0(list_of_strings: Dict[str, str]):
      return _dict_mapper(list_of_strings, lambda x: int(x))
    
    def node_1(list_of_numbers: Dict[str, int]):
      return _dict_mapper(list_of_numbers, lambda x: x+1)
    
    def node_2(list_of_numbers: Dict[str, int]):
      return _dict_mapper(list_of_numbers, lambda x: str(x))
    
    def node_3(list_of_strings: Dict[str, str]):
      return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')
    
    
    # catalog.yml
    data:
      type: IncrementalDataSet
      dataset: text.TextDataSet
      path: folder/with/text_files/each/containing/single/number/
      filename_suffix: .txt
    
    # pipeline.py
    
    Pipeline([
      node(node_0, inputs='data', outputs='0'),
      node(node_1, inputs='0', outputs='1'),
      node(node_2, inputs='1', outputs='2'),
      node(node_3, inputs='2', outputs='final_output'),
    ])