Not sure if this is possible or not, but this is what I am trying to do: -
I want to extract out portions (steps) of a function as individual nodes (ok so far), but the catch is I have an iterator on top of steps, which is dependent on some logic on dataset i.e. repeating the same operation (which are independent) on logical partitions of a dataset.
def single_node(list_of_numbers):
modified_list = [] # to store all output
for x in list_of_numbers: # iteration logic
x+=1 # Step 1
x=str(x) # Step 2
x+="_suffix" # Step 3
modified_list.append(x) # append to final output
return modified_list # return
list_of_numbers
, and for each element in this list I want to call this new pipeline. Finally I want to merge the output of all run's and generate one output.Seems somewhat similar to dynamic graph (multiple dynamic instance of a pipeline) which expands based on the dataset.
Is there a way to do this? Thank you!
This looks like the PartitionedDataSet or IncrementalDataSet might be of use to you.
They allow you to segregate your similar data into separate chunks, determined by files, and repeat operations on those chunks as you see fit.
So, rather than kick of x pipelines containing y nodes, you would have one pipeline that contains y nodes which processes x chunks of your data.
More on IncrementalDataSet in this video: https://www.youtube.com/watch?v=v7JSSiYgqpg
# nodes.py
from typing import Any, Dict, Callable
def _dict_mapper(dictionary: Dict[str, Any], fun: Callable):
# Apply your function to the dictionary mapping
return {k: fun(v) for k, v in dictionary.items()}
def node_0(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: int(x))
def node_1(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: x+1)
def node_2(list_of_numbers: Dict[str, int]):
return _dict_mapper(list_of_numbers, lambda x: str(x))
def node_3(list_of_strings: Dict[str, str]):
return _dict_mapper(list_of_strings, lambda x: f'{x}_suffix')
# catalog.yml
data:
type: IncrementalDataSet
dataset: text.TextDataSet
path: folder/with/text_files/each/containing/single/number/
filename_suffix: .txt
# pipeline.py
Pipeline([
node(node_0, inputs='data', outputs='0'),
node(node_1, inputs='0', outputs='1'),
node(node_2, inputs='1', outputs='2'),
node(node_3, inputs='2', outputs='final_output'),
])