Search code examples
kubeflowkubeflow-pipelines

How to pass data or files between Kubeflow containerized components in python


I'm exploring Kubeflow as an option to deploy and connect various components of a typical ML pipeline. I'm using docker containers as Kubeflow components and so far I've been unable to successfully use ContainerOp.file_outputs object to pass results between components.

Based on my understanding of the feature, creating and saving to a file that's declared as one of the file_outputs of a component should cause it to persist and be accessible for reading by the following component.

This is how I attempted to declare this in my pipeline python code:

import kfp.dsl as dsl 
import kfp.gcp as gcp

@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
    data_collector = dsl.ContainerOp(
        name='data collector', 
        image='eu.gcr.io/kubeflow-demo-254012/data-collector',
        arguments=[ "--project_id", project_id ],
        file_outputs={ "output": '/output.txt' }
    )   
    data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=[ "--project_id", project_id ]
    )
    data_preprocessor.after(data_collector)
    #TODO: add other components
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')

In the python code for the data-collector.py component I fetch the dataset then write it to output.txt. I'm able to read from the file within the same component but not inside data-preprocessor.py where I get a FileNotFoundError.

Is the use of file_outputs invalid for container-based Kubeflow components or am I incorrectly using it in my code? If it's not an option in my case, is it possible to programmatically create Kubernetes volumes inside the pipeline declaration python code and use them instead of file_outputs?


Solution

  • Files created in one Kubeflow pipeline component are local to the container. To reference it in the subsequent steps, you would need to pass it as:

    data_preprocessor = dsl.ContainerOp(
            name='data preprocessor',
            image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
            arguments=["--fetched_dataset", data_collector.outputs['output'],
                       "--project_id", project_id,
                      ]
    

    Note: data_collector.outputs['output'] will contain the actual string contents of the file /output.txt (not a path to the file). If you want for it to contain the path of the file, you'll need to write the dataset to shared storage (like s3, or a mounted PVC volume) and write the path/link to the shared storage to /output.txt. data_preprocessor can then read the dataset based on the path.