Search code examples
pythonairflowgoogle-cloud-dataprocairflow-2.x

Facing Issue in passing metadata field with DataprocCreateClusterOperator (Airflow 2.0)


I am facing some issues while installing Packages in the Dataproc cluster using DataprocCreateClusterOperator I am trying to upgrade to Airflow 2.0

Error Message:

ValueError: metadata was invalid: [('bigquery-connector-version', '1.1.1'), ('spark-bigquery-connector-version', '0.17.2'), ('PIP_PACKAGES', 'oyaml'), ('x-goog-api-client', 'gl-python/3.8.12 grpc/1.39.0 gax/1.31.1 gccl/airflow_v2.1.2+composer')] 

On further dig down came across a [github link] (https://github.com/apache/airflow/pull/19446) , where this issue of ValueError: metadata was invalid is discussed in community forum.

I followed the link and used CLUSTER_CONFIG way to generate the cluster_config for DataprocCreateClusterOperator but now running into another new issue as below:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 67, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/opt/python3.8/lib/python3.8/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.INVALID_ARGUMENT
    details = "Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)"
    debug_error_string = "{"created":"@1640080533.396337969","description":"Error received from peer ipv4:142.250.97.95:443","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Compute Engine instance tag '-' must match pattern (?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)","grpc_status":3}"

I do not see much information around.

Here is the code :

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator

CLUSTER_CONFIG = ClusterGenerator(
    project_id="project_id",
    region="us-central1",
    cluster_name="cluster_name",
    tags="dataproc",
    num_workers=2,
    storage_bucket=None,
    num_masters=1,
    master_machine_type="n1-standard-4",
    master_disk_type="pd-standard",
    master_disk_size=1024,
    worker_machine_type="n1-standard-4",
    worker_disk_type="pd-standard",
    worker_disk_size=1024,
    properties={},
    image_version="1.5-ubuntu18",
    autoscaling_policy=None,
    idle_delete_ttl=7200,
    optional_components=['JUPYTER', 'ANACONDA'],
    metadata={"gcs-connector-version" : '2.1.1' , 
                  "bigquery-connector-version": '1.1.1',
                  "spark-bigquery-connector-version": '0.17.2',
                  "PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
                 },
    init_actions_uris =['gs://goog-dataproc-initialization-actions-{region}/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-{region}/python/pip-install.sh']
).make()

with dag:
   create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_test_dataproc_cluster",
        cluster_name="cluster_name",
        project_id="project_id",
        region="us-central1",
        cluster_config=CLUSTER_CONFIG,
    )
    
   create_dataproc_cluster


Solution

  • the following dag is working as expected, changed:

    • the cluster name (cluster_name -> cluster-name).
    • path for scripts.
    • Dag definition.
    import os
    from datetime import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import DataprocCreateClusterOperator
    from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator
    
    CLUSTER_CONFIG = ClusterGenerator(
        project_id="project_id",
        region="us-central1",
        cluster_name="cluster-name",
        tags=["dataproc"],
        num_workers=2,
        storage_bucket=None,
        num_masters=1,
        master_machine_type="n1-standard-4",
        master_disk_type="pd-standard",
        master_disk_size=1024,
        worker_machine_type="n1-standard-4",
        worker_disk_type="pd-standard",
        worker_disk_size=1024,
        properties={},
        image_version="1.5-ubuntu18",
        autoscaling_policy=None,
        idle_delete_ttl=7200,
        optional_components=['JUPYTER', 'ANACONDA'],
        metadata={"gcs-connector-version" : '2.1.1' , 
                      "bigquery-connector-version": '1.1.1',
                      "spark-bigquery-connector-version": '0.17.2',
                      "PIP_PACKAGES" : 'datalab shap oyaml click apache-airflow apache-airflow-providers-google'
                     },
        init_actions_uris =['gs://goog-dataproc-initialization-actions-us-central1/connectors/connectors.sh','gs://goog-dataproc-initialization-actions-us-central1/python/pip-install.sh']
    ).make()
    
    
    with models.DAG(
        "example_gcp_dataproc",
        schedule_interval='@once',
        start_date=datetime(2021, 1, 1),
        catchup=False,
    ) as dag:
        
       create_dataproc_cluster = DataprocCreateClusterOperator(
            task_id="create_test_dataproc_cluster",
            cluster_name="cluster-name",
            project_id="<your-project-name>",
            region="us-central1",
            cluster_config=CLUSTER_CONFIG,
        )
        
       create_dataproc_cluster