Search code examples
pythonminiokubeflowkubeflow-pipelinestfx

Is there an implemented way to use a kubeflow pipeline's output outside the pipeline?


I'm using local kubeflow pipelines for building a continuous machine learning test project. I have one pipeline that preprocess the data using TFX, and it saves the outputs automatically to minio. Outside of this pipeline, I want to train the model using tfx's Trainer, but I need the artifacts generated in the preprocessing pipeline. Is there an implemented way to import this outputs? I've looked through the documentation and some issues, but can't find an answer. And because I'm trying to do it continuous, I can't rely on doing it manually.

Example of my preprocessing pipeline:


    @kfp.dsl.pipeline(
      name='TFX',
      description='TFX pipeline'
    )
    def tfx_pipeline():
    
        # DL with wget, can use gcs instead as well
        fetch = kfp.dsl.ContainerOp(
          name='download',
          image='busybox',
          command=['sh', '-c'],
          arguments=[
              'sleep 1;'
              'mkdir -p /tmp/data;'
              'wget <gcp link> -O /tmp/data/results.csv'],
          file_outputs={'downloaded': '/tmp/data'})
        records_example = tfx_csv_gen(input_base=fetch.output)
        stats = tfx_statistic_gen(input_data=records_example.output)
        schema_op = tfx_schema_gen(stats.output)
        tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
        #tag::tft[]
        transformed_output = tfx_transform(
            input_data=records_example.output,
            schema=schema_op.outputs['output'],
            module_file=module_file) # Path to your TFT code on GCS/S3
        #end::tft[]

and then executing with


    kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')


    client = kfp.Client()
    client.list_experiments()
    #exp = client.create_experiment(name='mdupdate')


    my_experiment = client.create_experiment(name='tfx_pipeline')
    my_run = client.run_pipeline(my_experiment.id, 'tfx', 
      'tfx_pipeline.zip')

I'm working on a .ipynb in visual studio code


Solution

  • You can get that information like this: https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001

    component_name: This can be checked in the yaml definition of the pipeline, under templates.name (search for the component containing the output you want)

    artifact_name: This can also be checked in the yaml definition of the pipeline, under that same component on the outputs attribute

    Once you got these two parameters, you can use the functions as described in the above url:

    #!/usr/bin/env python3
    
    import json
    import tarfile
    from base64 import b64decode
    from io import BytesIO
    
    import kfp
    
    
    def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
        run = client.runs.get_run(run_id)
        workflow = json.loads(run.pipeline_runtime.workflow_manifest)
        nodes = workflow["status"]["nodes"]
        for node_id, node_info in nodes.items():
            if node_info["displayName"] == component_name:
                return node_id
        else:
            raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")
    
    
    def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
        artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
        # Artifacts are returned as base64-encoded .tar.gz strings
        data = b64decode(artifact.data)
        io_buffer = BytesIO()
        io_buffer.write(data)
        io_buffer.seek(0)
        data = None
        with tarfile.open(fileobj=io_buffer) as tar:
            member_names = tar.getnames()
            if len(member_names) == 1:
                data = tar.extractfile(member_names[0]).read().decode('utf-8')
            else:
                # Is it possible for KFP artifacts to have multiple members?
                data = {}
                for member_name in member_names:
                    data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
        return data
    
    
    if __name__ == "__main__":
        run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
        component_name = "my-component"
        # For an output variable named "output_data"
        artifact_name = "my-component-output_data"
    
        client = kfp.Client()
        node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
        artifact = get_artifact(
            run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
        )
        # Do something with artifact ...