I want to write a pipeline in Kubeflow pipeline that has 2 components: A and B
The output of A is list of image path.
I want to run a docker image (B) for each image path
From what I see the dsl.ContainerOp
of B can wait for output of A, but I don't know how to create multiple instances of B
Update: This has changed recently and can be done simply by using ParallerlFor
over the output. Refer: https://stackoverflow.com/a/59292863/4438213
----- Below for KF 0.6 and before ----
This is a recognized issue with Kubeflow DSL: to use the output of one component (A) and iterate over it running a new component (B) for each entry in the previous output. It's hard since the DSL, Kubeflow uses, is at compile time and it's not possible to know how many elements there would be in the output at that time.
Ref:
The only form of dynamic (run-time) iteration supported as of KF v0.6 is: dsl-recursion. I've made it work in 2 ways lacking the pending work on the issues above:
If the size of the result of A is going to be a constant in each run and is pre-known, this is straight forward.
CASE A: The size of the output from the previous step is known
# Write a python code to extract the path from
# the string of refs the previous step returns
def get_path(str_of_paths: str, idx: int) -> str:
return str_of_paths.split(" ")[idx] # or some other delimiter
get_img_path_comp = comp.func_to_container_op(get_path,base_image='tensorflow/tensorflow') # or any appropriate base image
And then a regular for loop in your pipeline dsl code would work
image_path_res = ContainerOP_A() # run your container Op
for idx in range(4):
path = get_path(image_path_res.output, i)
ContainerOp_B(path.output)
CASE B: When the output of the previous step is not of fixed size
This is a little tricky and intricate. The only form of dynamic looping Kubeflow allows as of KF v0.6 is dsl-recursion
Option 1
sizer_op
and then reuse the same get_img_path_comp
from above.@dsl.component
def sizer_op(str_of_refs) -> int:
return len(str_of_refs.split("|"))
sizer_op_comp = comp.func_to_container_op(sizer_op,base_image='tensorflow/tensorflow')
Then you can run the recusive function
@dsl.component
def subtracter_op(cur_idx) -> int:
return cur_idx - 1
sub_op_comp = comp.func_to_container_op(subtracter_op,base_image='tensorflow/tensorflow')
@dsl.graph_component
def recursive_run(list_of_images, cur_ref):
with dsl.Condition(cur_ref >= 0):
path = get_path(image_path_res.output, i)
ContainerOp_B(path.output)
# call recursively
next_ref = sub_op_comp(cur_ref)
recursive_run(list_of_images, next_ref)
image_path_res = ContainerOP_A() # run your container Op
sizer = sizer_op_comp(image_path_res)
recursive_run(image_path_res.output, sizer.output)
Option 2
After running ContainerOp_A, create a Kubeflow Component that reads the results from ContainerOp_A, parses the results in python code itself and then spawns new runs that run just Containerop_B using kfclient. You can connect to KF pipeline client using:
kf_client = Client(host=localhost:9990)
Refer: kf_client