Search code examples
google-cloud-platformcloudairflowgoogle-cloud-dataprocgoogle-cloud-composer

How to pass the dataprocs metastore (dpms) in DataprocClusterCreateOperator


I am trying to pass the DPMS config to DataprocClusterCreateOperator but not able to find the proper way to pass this configuration. Can anyone help me how to pass the dpms config in DataprocClusterCreateOperator?


Solution

  • You may use this code based on this Documentation:

    import os
    from datetime import datetime
    
    from airflow import models
    from airflow.providers.google.cloud.operators.dataproc import (DataprocCreateClusterOperator,ClusterGenerator)
    from airflow.utils.trigger_rule import TriggerRule
    
    #NV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
    
    DAG_ID = "dataproc_hive"
    
    PROJECT_ID = "<your-proj-id>"
    
    CLUSTER_NAME = "<your-cluster>"
    
    REGION = "us-central1"
    
    # Cluster definition
    # [START how_to_cloud_dataproc_create_cluster]
    
    CLUSTER_CONFIG = {
        "master_config": {
            "num_instances": 1,
            "machine_type_uri": "n1-standard-4",
            "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
        },
        "worker_config": {
            "num_instances": 2,
            "machine_type_uri": "n1-standard-4",
            "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
        },
        "metastore_config":{
            "dataproc_metastore_service": "<path-to-your-metastore-service>/services/<your-metastore-service>"
        }
    }
    
    
    with models.DAG(
        DAG_ID,
        schedule_interval='@once',
        start_date=datetime(2022,7,7),
        catchup=False,
        tags=["example", "dataproc"],
    ) as dag:
        # [START how_to_cloud_dataproc_create_cluster_operator]
    
    
        create_cluster = DataprocCreateClusterOperator(
            task_id="create_cluster",
            project_id=PROJECT_ID,
            cluster_config=CLUSTER_CONFIG,
            region=REGION,
            cluster_name=CLUSTER_NAME,
    
        )
    
    
    create_cluster
    

    Created: enter image description here

    Logs: enter image description here