Search code examples
pythonkubeflowkubeflow-pipelines

kubeflow ParallelFor using the previous containerop output


I can create a static for loop using

with dsl.ParallelFor([1,2,3]) as item:
   ....

How can I use a container_op.output as an input to ParallelFor?
Assume the first container outputs an integer n, and then I want to run ParallelFor n times.

Attempts like this does not work:

container_op = ContainerOp(...)
with dsl.ParallelFor(container_op.output) as item:
   ....

I'm trying to stimulate a parallel python range(n) function.


Solution

  • The change to suport withItem(static looping) and withParams (dynamic looping) was done in multiple parts, but they're all available now. Refer PR.

    Ensure that your KPF version is 0.1.31 or above.

    It's possible to loop over the output of a previous container_op as below

    echo_op = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=['echo "[1,2,3]"> /tmp/output.txt'],
            file_outputs={'output': '/tmp/output.txt'})
    
    with dsl.ParallelFor(echo_op.output) as item:
            iterate_op = dsl.ContainerOp(
            name='iterate',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo {item} > /tmp/output.txt"],
            file_outputs={'output': '/tmp/output.txt'})
    

    Ensure that your output YAML looks something like this:

            name: for-loop-for-loop-3c29048d-1
            template: for-loop-for-loop-3c29048d-1
            withParam: '{{tasks.echo.outputs.parameters.echo-output}}'