Search code examples
tfxgoogle-cloud-ai-platform-pipelines

Specify machine type for a single TFX pipeline component in Vertex AI


I'm using TFX to build an AI Pipeline on Vertex AI. I've followed this tutorial to get started, then I adapted the pipeline to my own data which has over 100M rows of time series data. A couple of my components get killed midway because of memory issues, so I'd like to set the memory requirements for these components only. I use KubeflowV2DagRunner to orchestrated and launch the pipeline in Vertex AI with the following code:

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(
        default_image = 'gcr.io/watch-hop/hop-tfx-covid:0.6.2'
    ),
    output_filename=PIPELINE_DEFINITION_FILE)

_ = runner.run(
    create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_ROOT, metadata_path=METADATA_PATH))

A similar question has been answered on Stack Overflow, which has led me to a way to set memory requirements in AI Platform, but these configs don't exist anymore in KubeflowV2DagRunnerConfig, so I'm at a dead end.

Any help would be much appreciated.

** EDIT **
We define our components as python functions with the @component decorator, so most of them are custom components. For Training components, I know you can specify the machine type using the tfx.Trainer class as explained in this tutorial, though my question is for custom components that are not doing any training.


Solution

  • Turns out you can't at the moment but according to this issue, this feature is coming.

    An alternative solution is to convert your TFX pipeline to a Kubeflow pipeline. Vertex AI pipelines support kubeflow and with these you can set memory and cpu constraints at the component level.

    @component // imported from kfp.dsl
    def MyComponent(Input[Dataset] input_data):
      // ...
    
    @pipeline // imported from kfp.dsl
    def MyPipeline(...):
      component = MyComponent(...)
      component.set_memory_limit('64G') // alternative to set_memory_request(...)