Search code examples
python-3.xgoogle-cloud-platformgoogle-cloud-vertex-aikubeflowkubeflow-pipelines

Change uri name of a KFP artifact to .csv instead of "dataset"


When I output a KubeFlow Output[Dataset] that I know will be in a CSV format output URI is ending with the text dataset.

Is it possible to change the name of the URI to dataset.csv or to training_data.csv? The full URI for the artifact is now gs://<bucket-name>/<id>/<pipeline-name>+<id>/<script-name>+<id>/dataset.

This is the code I am executing.

def extract_bq_to_dataset(
    bq_client_project_id: str,
    source_project_id: str,
    dataset_id: str,
    table_name: str,
    dataset: Output[Dataset],
    dataset_location: str = "EU",
    extract_job_config: dict = None,
    ):
    from google.cloud import bigquery
    from google.cloud.exceptions import GoogleCloudError

    full_table_id = f"{source_project_id}.{dataset_id}.{table_name}"
    table = bigquery.table.Table(table_ref=full_table_id)

    if extract_job_config is None:
        extract_job_config = {}
    job_config = bigquery.job.ExtractJobConfig(**extract_job_config)

    client = bigquery.Client(project=bq_client_project_id, location=dataset_location)
    extract_job = client.extract_table(
        source=table,
        destination_uris=dataset.uri + ".csv",
        job_config=job_config,
        location=dataset_location,
    )
    print("Information about the artifact:")
    print("Name:", dataset.name)
    print("URI:", dataset.uri)
    print("Path:", dataset.path)
    print("Metadata:", dataset.metadata)

    try:
        result = extract_job.result()
    except GoogleCloudError as e:
        raise e

My solution now is to append ".csv" to the dataset.uri. However, this breaks references later in the program since I can no longer reference the data by train_data.path but instead need to write train_data.path+".csv". I would like to use the power to dynamically assign names, paths, and URI:s given by the KFP framework, and thus there is probably a better way to solve this than to manually add a .csv suffix as I'm doing now on the extract_job = client.extract_table(...) line.


Solution

  • After reading more about KubeFlow pipelines, pipelines/component-development, my understanding is that it's recommended to let the KubeFlow pipeline handle the data-passing between components.

    Modifying the URI is not recommended since each component that reads output from the component which outputs an artifact with a modified URI has to adjust the artifact path to read the data.

    To communicate that the output is in CSV format the artifact output name may instead be changed accordingly. E.g., change dataset to csv_dataset in the code above.