Search code examples
pythongoogle-cloud-platformpipelinekubeflowgoogle-cloud-vertex-ai

Vertex AI Pipeline Failed Precondition


I have been following this video: https://www.youtube.com/watch?v=1ykDWsnL2LE&t=310s

Code located at: https://codelabs.developers.google.com/vertex-pipelines-intro#5 (I have done the last two steps as per the video which isn't an issue for google_cloud_pipeline_components version: 0.1.1)

I have created a pipeline in vertex ai which ran and used the following code to create the pipeline (from video not code extract in link above):

#run pipeline
response = api_client.create_run_from_job_spec(
    "tab_classif_pipeline.json", pipeline_root = PIPELINE_ROOT,
    parameter_values = {
    "project" : PROJECT_ID,
    "display_name" : DISPLAY_NAME
    }
)
    

and in the GCP logs I get the following error:

"google.api_core.exceptions.FailedPrecondition: 400 BigQuery Dataset location `eu` must be in the same location as the service location `us-central1`.

I get the error at the dataset_create_op stage:

    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
    project = project, display_name = display_name, bq_source = bq_source
)

My dataset is configured in EU (the whole region) so I don't understand where us-central1 is coming from (or what the service location is?).

Here is the all the code I have used:

 PROJECT_ID = "marketingtown"
 BUCKET_NAME = f"gs://lookalike_model"
 from typing import NamedTuple
 import kfp
 from kfp import dsl
 from kfp.v2 import compiler
 from kfp.v2.dsl import (Artifact, Input, InputPath, Model, Output, 
                            OutputPath, ClassificationMetrics, 
 Metrics, component)
 from kfp.v2.components.types.artifact_types import Dataset
 from kfp.v2.google.client import AIPlatformClient
 from google.cloud import aiplatform
 from google_cloud_pipeline_components import aiplatform as gcc_aip

 #set environment variables
 PATH = %env PATH
 %env PATH = (PATH)://home/jupyter/.local/bin
 REGION = "europe-west2"
    
 #cloud storage path where artifact is created by pipeline
 PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
 PIPELINE_ROOT
 import time
 DISPLAY_NAME = f"lookalike_model_pipeline_{str(int(time.time()))}"
 print(DISPLAY_NAME)
 
@kfp.dsl.pipeline(name = "lookalike-model-training-v2", 
pipeline_root = PIPELINE_ROOT)

def pipeline(
    bq_source : str = f"bq://{PROJECT_ID}.MLOp_pipeline_temp.lookalike_training_set",
    display_name : str = DISPLAY_NAME,
    project : str = PROJECT_ID,
    gcp_region : str = "europe-west2",
    api_endpoint : str = "europe-west2-aiplatform.googleapis.com",
    thresholds_dict_str : str = '{"auPrc" : 0.3}'
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project = project, display_name = display_name, bq_source = bq_source
    )
    
    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"categorical": {"column_name": "agentId"}},
            {"categorical": {"column_name": "postcode"}},
            {"categorical": {"column_name": "isMobile"}},
            {"categorical": {"column_name": "gender"}},
            {"categorical": {"column_name": "timeOfDay"}},
            {"categorical": {"column_name": "sale"}},
        ],
        dataset=dataset_create_op.outputs["dataset"], #dataset from previous step
        target_column="sale",
    )
    
    #outputted evaluation metrics
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )
    
    #if deployment threshold is mean, deploy
    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):
        
    endpoint_op = gcc_aip.EndpointCreateOp(
        project=project,
        location=gcp_region,
        display_name="train-automl-beans",
    )
        
    #deploys model to an endpoint
    gcc_aip.ModelDeployOp(
        model=training_op.outputs["model"],
        endpoint=endpoint_op.outputs["endpoint"],
        min_replica_count=1,
        max_replica_count=1,
        machine_type="n1-standard-4",
        )
   

     compiler.Compiler().compile(
        pipeline_func = pipeline, package_path = "tab_classif_pipeline.json"
    )

    #run pipeline
    response = api_client.create_run_from_job_spec(
        "tab_classif_pipeline.json", pipeline_root = PIPELINE_ROOT,
        parameter_values = {
        "project" : PROJECT_ID,
        "display_name" : DISPLAY_NAME
        }
    )

Solution

  • I solved this issue by adding the location to the TabularDatasetCreateJob:

        dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project,
        display_name=display_name, 
        bq_source=bq_source,
        location = gcp_region
    )
    

    I now have the same issue with the model training job but I have learnt that a lot of the functions in the above code take a location parameter, or default to us-central1. I will update if I get any further.