Search code examples
python-3.xgoogle-cloud-platformgoogle-cloud-vertex-aikubeflow-pipelinesvertex-ai-pipeline

Kubeflow - how to pass Tensorflow Dataset and Tensors from one component to another?


I am implementing a Kubeflow pipeline in Vertex AI. Basically I have two components: prepare_data and train_model:

@component(
    packages_to_install = [
        "pandas==1.3.4",
        "numpy==1.20.3",
        "unidecode",
        "nltk==3.6.5",
        "gcsfs==2023.1.0"
        
    ],
)

def prepare_data(dataset:str,
        data_artifact: Output[Dataset]) -> NamedTuple("Outputs", [("ratings", Dataset),("movies", Dataset),("train", Dataset),("test", Dataset)]):

and...

@component(
    packages_to_install = [
        "tensorflow-recommenders==0.7.0",
        "tensorflow==2.9.1",
    ],
)
def train_model(epochs: int, 
                ratings: Input[Dataset],
                movies: Input[Dataset],
                train: Input[Dataset],
                test: Input[Dataset],
    model_artifact: Output[Model]) -> NamedTuple("Outputs", [("model_artifact", Model)]):

prepare_data is generating four Tensorflow datasets (movies, ratings,train and test) that will be used inside train_model component.

How do I save (or reference) these datasets from prepare_data to be used inside train_model? For instance, I get the following error:

AttributeError: 'Dataset' object has no attribute 'map'

For this line of code:

user_ids = ratings.map(lambda x: x["requisito"])

My pipeline looks like this:

@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT + "data-pipeline",
    name="pipeline-with-deployment",
)

def pipeline():
    prepare_data_op = prepare_data('gs://bucket-777/data.csv').set_cpu_limit('16').set_memory_limit('32G').set_caching_options(False)
    
training_op=train_model(3,prepare_data_op.outputs["ratings"],prepare_data_op.outputs["movies"],prepare_data_op.outputs["train"],prepare_data_op.outputs["test"]).set_cpu_limit('16').set_memory_limit('32G').set_caching_options(False)

    
    deploy_op = deploy_model(training_op.outputs["model_artifact"] ,"projectid","us-central1")

training_op.outputs["model_artifact"] is an index for similarity search. The whole thing works perfectly in a single piece data-train, but when I split, the datasets do not keep their properties.

Any ideas on how to overcome this issue are welcome.

I checked this stackoverflow question (here) but I am unsure on how to do this on Tensorflow Datasets and Tensors.

Pipeline


Solution

  • This will be implemented in the future by Kubeflow team. It's a planned feature, as seen here:

    https://github.com/kubeflow/pipelines/issues/8899#issuecomment-1452764426