Search code examples
google-cloud-platformgoogle-ai-platform

Creating metadata store for Google Cloud AI Platform?


I'm trying to do a simple basic Vertex Pipeline, and when I upload the json file, I get this:

Failed to create pipeline job. Error: Permission 'aiplatform.metadataStores.get' denied on resource '//aiplatform.googleapis.com/projects/399668206801/locations/us-central1/metadataStores/default' (or it may not exist).

I can't find how to create one - the docs (https://cloud.google.com/vertex-ai/docs/ml-metadata/configure) say it will be created for me. What do I need to do next?

** EDIT 1 ** Tried doing the same in the UI, similar bug:

UI upload error result

** EDIT 2 ** Here's the json:

{
  "pipelineSpec": {
    "components": {
      "comp-run-info-fn": {
        "executorLabel": "exec-run-info-fn",
        "inputDefinitions": {
          "parameters": {
            "run_id": {
              "type": "STRING"
            }
          }
        },
        "outputDefinitions": {
          "parameters": {
            "run_info": {
              "type": "STRING"
            }
          }
        }
      },
      "comp-same-step-000-4538957a762e4c2ea30bb0f819345e25-fn": {
        "executorLabel": "exec-same-step-000-4538957a762e4c2ea30bb0f819345e25-fn",
        "inputDefinitions": {
          "parameters": {
            "input_context_path": {
              "type": "STRING"
            },
            "metadata_url": {
              "type": "STRING"
            },
            "run_info": {
              "type": "STRING"
            }
          }
        },
        "outputDefinitions": {
          "parameters": {
            "output_context_path": {
              "type": "STRING"
            },
            "output_context_path_2": {
              "type": "STRING"
            }
          }
        }
      },
      "comp-same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn": {
        "executorLabel": "exec-same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn",
        "inputDefinitions": {
          "parameters": {
            "input_context_path": {
              "type": "STRING"
            },
            "metadata_url": {
              "type": "STRING"
            },
            "run_info": {
              "type": "STRING"
            }
          }
        },
        "outputDefinitions": {
          "parameters": {
            "output_context_path": {
              "type": "STRING"
            }
          }
        }
      },
      "comp-same-step-002-5cefd94e0a9c49cdb81a6c11a5c84ac9-fn": {
        "executorLabel": "exec-same-step-002-5cefd94e0a9c49cdb81a6c11a5c84ac9-fn",
        "inputDefinitions": {
          "parameters": {
            "input_context_path": {
              "type": "STRING"
            },
            "metadata_url": {
              "type": "STRING"
            },
            "run_info": {
              "type": "STRING"
            }
          }
        },
        "outputDefinitions": {
          "parameters": {
            "output_context_path": {
              "type": "STRING"
            }
          }
        }
      }
    },
    "deploymentSpec": {
      "executors": {
        "exec-run-info-fn": {
          "container": {
            "args": [
              "--executor_input",
              "{{$}}",
              "--function_to_execute",
              "run_info_fn"
            ],
            "command": [
              "sh",
              "-c",
              "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'kfp' 'dill' 'kfp==1.8.12' && \"$0\" \"$@\"\n",
              "sh",
              "-ec",
              "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
              "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef run_info_fn(\n    run_id: str,\n) -> NamedTuple(\"RunInfoOutput\", [(\"run_info\", str),]):\n    from base64 import urlsafe_b64encode\n    from collections import namedtuple\n    import datetime\n    import base64\n    import dill\n    import kfp\n\n    client = kfp.Client(host=\"http://ml-pipeline:8888\")\n    run_info = client.get_run(run_id=run_id)\n\n    run_info_dict = {\n        \"run_id\": run_info.run.id,\n        \"name\": run_info.run.name,\n        \"created_at\": run_info.run.created_at.isoformat(),\n        \"pipeline_id\": run_info.run.pipeline_spec.pipeline_id,\n    }\n\n    # Track kubernetes resources associated wth the run.\n    for r in run_info.run.resource_references:\n        run_info_dict[f\"{r.key.type.lower()}_id\"] = r.key.id\n\n    # Base64-encoded as value is visible in kubeflow ui.\n    output = urlsafe_b64encode(dill.dumps(run_info_dict))\n\n    return namedtuple(\"RunInfoOutput\", [\"run_info\"])(str(output, encoding=\"ascii\"))\n\n"
            ],
            "image": "python:3.7"
          }
        },
        "exec-same-step-000-4538957a762e4c2ea30bb0f819345e25-fn": {
          "container": {
            "args": [
              "--executor_input",
              "{{$}}",
              "--function_to_execute",
              "same_step_000_4538957a762e4c2ea30bb0f819345e25_fn"
            ],
            "command": [
              "sh",
              "-c",
              "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && \"$0\" \"$@\"\n",
              "sh",
              "-ec",
              "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
              "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef same_step_000_4538957a762e4c2ea30bb0f819345e25_fn(\n    input_context_path: InputPath(str),\n    output_context_path: OutputPath(str),\n    run_info: str,\n    metadata_url: str,\n) -> NamedTuple(\"StepOutput\", [(\"output_context_path\", str),]):\n    from base64 import urlsafe_b64encode, urlsafe_b64decode\n    from pathlib import Path\n    import datetime\n    import requests\n    import tempfile\n    import dill\n    import os\n    from collections import namedtuple\n\n    # run_info = \"gAR9lC4=\"\n    # metadata_url = \"\"\n\n    input_context = None\n    with Path(input_context_path.path).open(\"rb\") as reader:\n        input_context = reader.read()\n\n    # Helper function for posting metadata to mlflow.\n    def post_metadata(json):\n        if metadata_url == \"\":\n            return\n\n        try:\n            req = requests.post(metadata_url, json=json)\n            req.raise_for_status()\n        except requests.exceptions.HTTPError as err:\n            print(f\"Error posting metadata: {err}\")\n\n    # Move to writable directory as user might want to do file IO.\n    # TODO: won't persist across steps, might need support in SDK?\n    os.chdir(tempfile.mkdtemp())\n\n    # Load information about the current experiment run:\n    run_info = dill.loads(urlsafe_b64decode(run_info))\n\n    # Post session context to mlflow.\n    if len(input_context) > 0:\n        input_context_str = urlsafe_b64encode(input_context)\n        post_metadata(\n            {\n                \"experiment_id\": run_info[\"experiment_id\"],\n                \"run_id\": run_info[\"run_id\"],\n                \"step_id\": \"same_step_000\",\n                \"metadata_type\": \"input\",\n                \"metadata_value\": input_context_str,\n                \"metadata_time\": datetime.datetime.now().isoformat(),\n            }\n        )\n\n    # User code for step, which we run in its own execution frame.\n    user_code = f\"\"\"\nimport dill\n\n# Load session context into global namespace:\nif { len(input_context) } > 0:\n    dill.load_session(\"{ input_context_path }\")\n\n{dill.loads(urlsafe_b64decode(\"gASVTQIAAAAAAABYRgIAAGRhdGFzZXQgPSAnc2FtcGxlX2RhdGEnCmdwdV90eXBlID0gJ0ExMDAnCmltcG9ydCB0ZW5zb3JmbG93CmltcG9ydCBkYXRldGltZQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSAxMApiID0gYSArIDUgIzE1CmZyb20gSVB5dGhvbi5kaXNwbGF5IGltcG9ydCBJbWFnZQoKdXJsID0gJ2h0dHBzOi8vcmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbS9TQU1FLVByb2plY3QvU0FNRS1zYW1wbGVzL21haW4vdGVzdC1hcnRpZmFjdHMvRmFyb2VJc2xhbmRzLmpwZWcnCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIzI1Cgpmcm9tIElQeXRob24gaW1wb3J0IGRpc3BsYXkKZGlzcGxheS5JbWFnZSh1cmwpCgppbXBvcnQgcGxvdGx5CgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKZGVmIHNvbWVfbWF0aCh4LCB6KSAtPiB0dXBsZToKICAgIHJldHVybiAocm91bmQoeCArIHosIDIpLCByb3VuZCh4IC8geiwgMikpCgphID0gYSAqIDIwCmIgPSBiICogMTAwICMyNTAwCgpwcmludChmIkIgPSB7Yn0iKZQu\"))}\n\n# Remove anything from the global namespace that cannot be serialised.\n# TODO: this will include things like pandas dataframes, needs sdk support?\n_bad_keys = []\n_all_keys = list(globals().keys())\nfor k in _all_keys:\n    try:\n        dill.dumps(globals()[k])\n    except TypeError:\n        _bad_keys.append(k)\n\nfor k in _bad_keys:\n    del globals()[k]\n\n# Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n    # Runs the user code in a new execution frame. Context from the previous\n    # component in the run is loaded into the session dynamically, and we run\n    # with a single globals() namespace to simulate top-level execution.\n    exec(user_code, globals(), globals())\n\n    # Post new session context to mlflow:\n    with Path(output_context_path).open(\"rb\") as reader:\n        context = urlsafe_b64encode(reader.read())\n        post_metadata(\n            {\n                \"experiment_id\": run_info[\"experiment_id\"],\n                \"run_id\": run_info[\"run_id\"],\n                \"step_id\": \"same_step_000\",\n                \"metadata_type\": \"output\",\n                \"metadata_value\": context,\n                \"metadata_time\": datetime.datetime.now().isoformat(),\n            }\n        )\n\n    return namedtuple(\"StepOutput\", [\"output_context_path\"])(str(output_context_path, encoding=\"ascii\"))\n\n"
            ],
            "image": "library/python:3.9-slim-buster"
          }
        },
        "exec-same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn": {
          "container": {
            "args": [
              "--executor_input",
              "{{$}}",
              "--function_to_execute",
              "same_step_001_4847e5a4edc84257aa4ff6fe8aa0159b_fn"
            ],
            "command": [
              "sh",
              "-c",
              "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && \"$0\" \"$@\"\n",
              "sh",
              "-ec",
              "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
              "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef same_step_001_4847e5a4edc84257aa4ff6fe8aa0159b_fn(\n    input_context_path: InputPath(str),\n    output_context_path: OutputPath(str),\n    run_info: str = \"gAR9lC4=\",\n    metadata_url: str = \"\",\n):\n    from base64 import urlsafe_b64encode, urlsafe_b64decode\n    from pathlib import Path\n    import datetime\n    import requests\n    import tempfile\n    import dill\n    import os\n\n    input_context = None\n    with Path(input_context_path.path).open(\"rb\") as reader:\n        input_context = reader.read()\n\n    # Helper function for posting metadata to mlflow.\n    def post_metadata(json):\n        if metadata_url == \"\":\n            return\n\n        try:\n            req = requests.post(metadata_url, json=json)\n            req.raise_for_status()\n        except requests.exceptions.HTTPError as err:\n            print(f\"Error posting metadata: {err}\")\n\n    # Move to writable directory as user might want to do file IO.\n    # TODO: won't persist across steps, might need support in SDK?\n    os.chdir(tempfile.mkdtemp())\n\n    # Load information about the current experiment run:\n    run_info = dill.loads(urlsafe_b64decode(run_info))\n\n    # Post session context to mlflow.\n    if len(input_context) > 0:\n        input_context_str = urlsafe_b64encode(input_context)\n        post_metadata(\n            {\n                \"experiment_id\": run_info[\"experiment_id\"],\n                \"run_id\": run_info[\"run_id\"],\n                \"step_id\": \"same_step_001\",\n                \"metadata_type\": \"input\",\n                \"metadata_value\": input_context_str,\n                \"metadata_time\": datetime.datetime.now().isoformat(),\n            }\n        )\n\n    # User code for step, which we run in its own execution frame.\n    user_code = f\"\"\"\nimport dill\n\n# Load session context into global namespace:\nif { len(input_context) } > 0:\n    dill.load_session(\"{ input_context_path }\")\n\n{dill.loads(urlsafe_b64decode(\"gASVVwIAAAAAAABYUAIAAGltcG9ydCBudW1weSBhcyBucAppbXBvcnQgbWF0cGxvdGxpYi5weXBsb3QgYXMgcGx0CmltcG9ydCBzY2lweS5zdGF0cyBhcyBzdGF0cwoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCm11ID0gMApzdGQgPSAxCgp4ID0gbnAubGluc3BhY2Uoc3RhcnQ9LTQsIHN0b3A9NCwgbnVtPTEwMCkKeSA9IHN0YXRzLm5vcm0ucGRmKHgsIG11LCBzdGQpCgphID0gYSArIDUKYiA9IGIgKyAxMCAjIDI1MTUgCgpwbHQucGxvdCh4LCB5KQpwbHQuc2hvdygpCmltcG9ydCByZXF1ZXN0cwppbXBvcnQgcGFuZGFzIGFzIHBkCmltcG9ydCBwbG90bHkuZmlndXJlX2ZhY3RvcnkgYXMgZmYKaW1wb3J0IGNoYXJ0X3N0dWRpby5wbG90bHkgYXMgcHkKCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCgp1cmwgPSAnaHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL1NBTUUtUHJvamVjdC9TQU1FLXNhbXBsZXMvbWFpbi90ZXN0LWFydGlmYWN0cy90ZXN0LmNzdicKZGYgPSBwZC5yZWFkX2Nzdih1cmwpCgphID0gYSAqIDEwMDAKYiA9IGIgLyA2NyAjIDM3LjUzNzMxMzQzMjgKCmRmLmRlc2NyaWJlKCmULg==\"))}\n\n# Remove anything from the global namespace that cannot be serialised.\n# TODO: this will include things like pandas dataframes, needs sdk support?\n_bad_keys = []\n_all_keys = list(globals().keys())\nfor k in _all_keys:\n    try:\n        dill.dumps(globals()[k])\n    except TypeError:\n        _bad_keys.append(k)\n\nfor k in _bad_keys:\n    del globals()[k]\n\n# Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n    # Runs the user code in a new execution frame. Context from the previous\n    # component in the run is loaded into the session dynamically, and we run\n    # with a single globals() namespace to simulate top-level execution.\n    exec(user_code, globals(), globals())\n\n    # Post new session context to mlflow:\n    with Path(output_context_path).open(\"rb\") as reader:\n        context = urlsafe_b64encode(reader.read())\n        post_metadata(\n            {\n                \"experiment_id\": run_info[\"experiment_id\"],\n                \"run_id\": run_info[\"run_id\"],\n                \"step_id\": \"same_step_001\",\n                \"metadata_type\": \"output\",\n                \"metadata_value\": context,\n                \"metadata_time\": datetime.datetime.now().isoformat(),\n            }\n        )\n\n"
            ],
            "image": "library/python:3.9-slim-buster"
          }
        },
        "exec-same-step-002-5cefd94e0a9c49cdb81a6c11a5c84ac9-fn": {
          "container": {
            "args": [
              "--executor_input",
              "{{$}}",
              "--function_to_execute",
              "same_step_002_5cefd94e0a9c49cdb81a6c11a5c84ac9_fn"
            ],
            "command": [
              "sh",
              "-c",
              "\nif ! [ -x \"$(command -v pip)\" ]; then\n    python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet     --no-warn-script-location 'dill' 'requests' 'chart_studio' 'ipython' 'matplotlib' 'numpy' 'pandas' 'plotly' 'Requests' 'scipy' 'tensorflow' 'kfp==1.8.12' && \"$0\" \"$@\"\n",
              "sh",
              "-ec",
              "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main                         --component_module_path                         \"$program_path/ephemeral_component.py\"                         \"$@\"\n",
              "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef same_step_002_5cefd94e0a9c49cdb81a6c11a5c84ac9_fn(\n    input_context_path: InputPath(str),\n    output_context_path: OutputPath(str),\n    run_info: str = \"gAR9lC4=\",\n    metadata_url: str = \"\",\n):\n    from base64 import urlsafe_b64encode, urlsafe_b64decode\n    from pathlib import Path\n    import datetime\n    import requests\n    import tempfile\n    import dill\n    import os\n\n    input_context = None\n    with Path(input_context_path.path).open(\"rb\") as reader:\n        input_context = reader.read()\n\n    # Helper function for posting metadata to mlflow.\n    def post_metadata(json):\n        if metadata_url == \"\":\n            return\n\n        try:\n            req = requests.post(metadata_url, json=json)\n            req.raise_for_status()\n        except requests.exceptions.HTTPError as err:\n            print(f\"Error posting metadata: {err}\")\n\n    # Move to writable directory as user might want to do file IO.\n    # TODO: won't persist across steps, might need support in SDK?\n    os.chdir(tempfile.mkdtemp())\n\n    # Load information about the current experiment run:\n    run_info = dill.loads(urlsafe_b64decode(run_info))\n\n    # Post session context to mlflow.\n    if len(input_context) > 0:\n        input_context_str = urlsafe_b64encode(input_context)\n        post_metadata(\n            {\n                \"experiment_id\": run_info[\"experiment_id\"],\n                \"run_id\": run_info[\"run_id\"],\n                \"step_id\": \"same_step_002\",\n                \"metadata_type\": \"input\",\n                \"metadata_value\": input_context_str,\n                \"metadata_time\": datetime.datetime.now().isoformat(),\n            }\n        )\n\n    # User code for step, which we run in its own execution frame.\n    user_code = f\"\"\"\nimport dill\n\n# Load session context into global namespace:\nif { len(input_context) } > 0:\n    dill.load_session(\"{ input_context_path }\")\n\n{dill.loads(urlsafe_b64decode(\"gASV9wEAAAAAAABY8AEAAGEgPSBhICsgNQpiID0gYiArIDEwICMgNDcuNTM3MzEzNDMyOApwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpnID0gc29tZV9tYXRoKDgsIDIxKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQpqID0gZ1swXQprID0gZ1sxXQoKcHJpbnQoZiJUaW1lOiB7ZGF0ZXRpbWUuZGF0ZXRpbWUubm93KCl9IikKCmEgPSBhICsgNQpiID0gYiArIDEwICMgNTcuNTM3MzEzNDMyOAoKcHJpbnQoZiJqOiB7an0iKQpwcmludChmIms6IHtrfSIpCgpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQoKYSA9IGEgKyA1CmIgPSBiICsgMTAgIyA2Ny41MzczMTM0MzI4CnByaW50KCIwLjAuMiIpCnByaW50KGYiVGltZToge2RhdGV0aW1lLmRhdGV0aW1lLm5vdygpfSIpCnByaW50KGYiQWNjZXNzaW5nIHRoZSB2YWx1ZSBvZiBCOiB7Yn0iKQpwcmludChmIlRpbWU6IHtkYXRldGltZS5kYXRldGltZS5ub3coKX0iKQqULg==\"))}\n\n# Remove anything from the global namespace that cannot be serialised.\n# TODO: this will include things like pandas dataframes, needs sdk support?\n_bad_keys = []\n_all_keys = list(globals().keys())\nfor k in _all_keys:\n    try:\n        dill.dumps(globals()[k])\n    except TypeError:\n        _bad_keys.append(k)\n\nfor k in _bad_keys:\n    del globals()[k]\n\n# Save new session context to disk for the next component:\ndill.dump_session(\"{output_context_path}\")\n\"\"\"\n\n    # Runs the user code in a new execution frame. Context from the previous\n    # component in the run is loaded into the session dynamically, and we run\n    # with a single globals() namespace to simulate top-level execution.\n    exec(user_code, globals(), globals())\n\n    # Post new session context to mlflow:\n    with Path(output_context_path).open(\"rb\") as reader:\n        context = urlsafe_b64encode(reader.read())\n        post_metadata(\n            {\n                \"experiment_id\": run_info[\"experiment_id\"],\n                \"run_id\": run_info[\"run_id\"],\n                \"step_id\": \"same_step_002\",\n                \"metadata_type\": \"output\",\n                \"metadata_value\": context,\n                \"metadata_time\": datetime.datetime.now().isoformat(),\n            }\n        )\n\n"
            ],
            "image": "library/python:3.9-slim-buster"
          }
        }
      }
    },
    "pipelineInfo": {
      "name": "root-pipeline-compilation"
    },
    "root": {
      "dag": {
        "tasks": {
          "run-info-fn": {
            "cachingOptions": {
              "enableCache": true
            },
            "componentRef": {
              "name": "comp-run-info-fn"
            },
            "inputs": {
              "parameters": {
                "run_id": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "{{workflow.uid}}"
                    }
                  }
                }
              }
            },
            "taskInfo": {
              "name": "run-info-fn"
            }
          },
          "same-step-000-4538957a762e4c2ea30bb0f819345e25-fn": {
            "cachingOptions": {
              "enableCache": true
            },
            "componentRef": {
              "name": "comp-same-step-000-4538957a762e4c2ea30bb0f819345e25-fn"
            },
            "dependentTasks": [
              "run-info-fn"
            ],
            "inputs": {
              "parameters": {
                "input_context_path": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": ""
                    }
                  }
                },
                "metadata_url": {
                  "componentInputParameter": "metadata_url"
                },
                "run_info": {
                  "taskOutputParameter": {
                    "outputParameterKey": "run_info",
                    "producerTask": "run-info-fn"
                  }
                }
              }
            },
            "taskInfo": {
              "name": "same-step-000-4538957a762e4c2ea30bb0f819345e25-fn"
            }
          },
          "same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn": {
            "cachingOptions": {
              "enableCache": true
            },
            "componentRef": {
              "name": "comp-same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn"
            },
            "dependentTasks": [
              "run-info-fn",
              "same-step-000-4538957a762e4c2ea30bb0f819345e25-fn"
            ],
            "inputs": {
              "parameters": {
                "input_context_path": {
                  "taskOutputParameter": {
                    "outputParameterKey": "output_context_path",
                    "producerTask": "same-step-000-4538957a762e4c2ea30bb0f819345e25-fn"
                  }
                },
                "metadata_url": {
                  "componentInputParameter": "metadata_url"
                },
                "run_info": {
                  "taskOutputParameter": {
                    "outputParameterKey": "run_info",
                    "producerTask": "run-info-fn"
                  }
                }
              }
            },
            "taskInfo": {
              "name": "same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn"
            }
          },
          "same-step-002-5cefd94e0a9c49cdb81a6c11a5c84ac9-fn": {
            "cachingOptions": {
              "enableCache": true
            },
            "componentRef": {
              "name": "comp-same-step-002-5cefd94e0a9c49cdb81a6c11a5c84ac9-fn"
            },
            "dependentTasks": [
              "run-info-fn",
              "same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn"
            ],
            "inputs": {
              "parameters": {
                "input_context_path": {
                  "taskOutputParameter": {
                    "outputParameterKey": "output_context_path",
                    "producerTask": "same-step-001-4847e5a4edc84257aa4ff6fe8aa0159b-fn"
                  }
                },
                "metadata_url": {
                  "componentInputParameter": "metadata_url"
                },
                "run_info": {
                  "taskOutputParameter": {
                    "outputParameterKey": "run_info",
                    "producerTask": "run-info-fn"
                  }
                }
              }
            },
            "taskInfo": {
              "name": "same-step-002-5cefd94e0a9c49cdb81a6c11a5c84ac9-fn"
            }
          }
        }
      },
      "inputDefinitions": {
        "parameters": {
          "context": {
            "type": "STRING"
          },
          "metadata_url": {
            "type": "STRING"
          }
        }
      }
    },
    "schemaVersion": "2.0.0",
    "sdkVersion": "kfp-1.8.12"
  },
  "runtimeConfig": {
    "parameters": {
      "context": {
        "stringValue": ""
      },
      "metadata_url": {
        "stringValue": ""
      }
    }
  }
}

And here's the python to attempt to run:

    from google.cloud import aiplatform

    job = aiplatform.PipelineJob(display_name="MY_DISPLAY_JOB", template_path=compiled_pipeline_path, project=project_id, pipeline_root=pipeline_root)


Solution

  • Suppose you are using vertexAI Notebook to run pipeline jobs.

    The service account of the Notebook (actually a Compute Engine) should have permissions (reference A; reference B):

    • roles/aiplatform.user;
    • roles/storage.objectViewer;
    • roles/storage.objectCreator;
    • roles/iam.serviceAccountUser

    One way is to do it in CloudShell (!not notebook terminal), for each role

    gcloud projects add-iam-policy-binding <your_project_id> \
    --member=serviceAccount:"<your_serviceaccount_email>" \
    --role="roles/aiplatform.user"
    

    The service account email looks like:

    [email protected]
    

    P.S., keep in mind that the serviceAccount looks like an email, not the Unique ID of the service account.

    P.P.S., you might get the error:

    Policy members must be of the form ":"

    If so, it is because --member requires serviceAccount:<service_account_name> or user:<user_name>, not the account name alone.