I am implementing a Kubeflow pipeline in Vertex AI. Basically I have two components: prepare_data
and train_model
:
@component(
packages_to_install = [
"pandas==1.3.4",
"numpy==1.20.3",
"unidecode",
"nltk==3.6.5",
"gcsfs==2023.1.0"
],
)
def prepare_data(dataset:str,
data_artifact: Output[Dataset]) -> NamedTuple("Outputs", [("ratings", Dataset),("movies", Dataset),("train", Dataset),("test", Dataset)]):
and...
@component(
packages_to_install = [
"tensorflow-recommenders==0.7.0",
"tensorflow==2.9.1",
],
)
def train_model(epochs: int,
ratings: Input[Dataset],
movies: Input[Dataset],
train: Input[Dataset],
test: Input[Dataset],
model_artifact: Output[Model]) -> NamedTuple("Outputs", [("model_artifact", Model)]):
prepare_data
is generating four Tensorflow datasets (movies, ratings,train and test) that will be used inside train_model
component.
How do I save (or reference) these datasets from prepare_data
to be used inside train_model
? For instance, I get the following error:
AttributeError: 'Dataset' object has no attribute 'map'
For this line of code:
user_ids = ratings.map(lambda x: x["requisito"])
My pipeline looks like this:
@dsl.pipeline(
pipeline_root=PIPELINE_ROOT + "data-pipeline",
name="pipeline-with-deployment",
)
def pipeline():
prepare_data_op = prepare_data('gs://bucket-777/data.csv').set_cpu_limit('16').set_memory_limit('32G').set_caching_options(False)
training_op=train_model(3,prepare_data_op.outputs["ratings"],prepare_data_op.outputs["movies"],prepare_data_op.outputs["train"],prepare_data_op.outputs["test"]).set_cpu_limit('16').set_memory_limit('32G').set_caching_options(False)
deploy_op = deploy_model(training_op.outputs["model_artifact"] ,"projectid","us-central1")
training_op.outputs["model_artifact"]
is an index for similarity search. The whole thing works perfectly in a single piece data-train
, but when I split, the datasets do not keep their properties.
Any ideas on how to overcome this issue are welcome.
I checked this stackoverflow question (here) but I am unsure on how to do this on Tensorflow Datasets and Tensors.
This will be implemented in the future by Kubeflow team. It's a planned feature, as seen here:
https://github.com/kubeflow/pipelines/issues/8899#issuecomment-1452764426