Search code examples
pythonkubeflowkubeflow-pipelines

Issue when trying to pass data between Kubeflow components using files


I made two components using python functions and I am trying to pass data between them using files, but I am unable to do so. I want to calculate the sum and then send the answer to the other component using a file. Below is the partial code (The code works without the file passing). Please assist.

# Define your components code as standalone python functions:======================
    def add(a: float, b: float, f: comp.OutputTextFile(float)) -> NamedTuple(
        'AddOutput',
        [
            ('out', comp.OutputTextFile(float))
        ]):
        '''Calculates sum of two arguments'''
        sum = a+b

        f.write(sum)

        from collections import namedtuple

        addOutput = namedtuple(
            'AddOutput',
            ['out'])
        return addOutput(f)  # the metrics will be uploaded to the cloud


    def multiply(c:float, d:float, f: comp.InputTextFile(float) ):
        '''Calculates the product'''
        product = c * d

        print(f.read())


add_op = comp.func_to_container_op(add, output_component_file='add_component.yaml')
    product_op = comp.create_component_from_func(multiply, 
output_component_file='multiple_component.yaml')


@dsl.pipeline(
      name='Addition-pipeline',
      description='An example pipeline that performs addition calculations.'
    )
    def my_pipeline(a, b='7', c='4', d='1'):

        add_op = pl_comp_list[0]
        product_op = pl_comp_list[1]

        first_add_task = add_op(a, 4)
        second_add_task = product_op(c, d, first_add_task.outputs['out'])

Solution

  • Here is a slightly simplified version of your pipeline that I tested and which works. It doesn't matter what class type you pass to OutputTextFile and InputTextFile. It'll be read and written as str. So this is what you should change:

    • While writing to OutputTextFile: cast sum_ from float to str
    • While reading from InputTextFile: cast f.read() value from str to float
    import kfp
    from kfp import dsl
    from kfp import components as comp
    
    
    def add(a: float, b: float, f: comp.OutputTextFile()):
        '''Calculates sum of two arguments'''
        sum_ = a + b
        f.write(str(sum_)) # cast to str
        return sum_
    
    
    def multiply(c: float, d: float, f: comp.InputTextFile()):
        '''Calculates the product'''
        in_ = float(f.read()) # cast to float
        product = c * d * in_
        print(product)
        return product
    
    
    add_op = comp.func_to_container_op(add,
                                       output_component_file='add_component.yaml')
    product_op = comp.create_component_from_func(
        multiply, output_component_file='multiple_component.yaml')
    
    
    @dsl.pipeline(
        name='Addition-pipeline',
        description='An example pipeline that performs addition calculations.')
    def my_pipeline(a, b='7', c='4', d='1'):
    
        first_add_task = add_op(a, b)
        second_add_task = product_op(c, d, first_add_task.output)
    
    
    if __name__ == "__main__":
        compiled_name = __file__ + ".yaml"
        kfp.compiler.Compiler().compile(my_pipeline, compiled_name)