Search code examples
airflowdataprocgoogle-cloud-dataproc-serverlessastronomer

Is there a way to pass the dataproc version via the Airflow DataprocCreateBatchOperator method?


I've just run into an issue where the default version of dataproc upgraded and broke my job which is being submitted using the DataprocCreateBatchOperator method through airflow.

     task2 = DataprocCreateBatchOperator(
         task_id="trip_level_data",
         project_id="generic_project_id",
         region="us-east4",
         batch_id="trip-"+"".join(random.choice(string.ascii_lowercase + string.digits) for i in range(35)),
         batch={
"pyspark_batch" : {
    "args" : [
        "--env=prod"
    ],
    "jar_file_uris" : [
        "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.0.jar"
    ],
    "main_python_file_uri" : "gs://generic_bucket/get_trip_info.py"
},
"labels" : {
    "dag_id" : "{{ run_id_format(dag.dag_id) }}",
    "dag_run_id" : "{{ run_id_format(run_id) }}",
    "task_id" : "{{ run_id_format(task.task_id) }}"
},
"environment_config" : {
    "execution_config" : {
        "service_account" : "svc@generic_project.iam.gserviceaccount.com",
        "subnetwork_uri" : "https://www.googleapis.com/compute/alpha/projects/shared-vpc-admin/regions/us-east4/subnetworks/prod-us-east4-01"
    }
}

} )

I don't see anywhere in the documentation how to provide the version number to the DAG method when creating the operator though. If run via gcloud command line, I can use --version=2.0.85 to manually run the job and it works fine.


Solution

  • Yes, you need to use the runtime_config and specify the "version. you can specify the major verison, 2.0 in order to run the job with 2.0.85 that is the latest available version for 2.0

    refer this sample code

                'execution_config': {
                    'subnetwork_uri': SUBNET,
                    'kms_key': KMS_KEY,
                },
                'peripherals_config': {
                    'metastore_service': METASTORE_SERVICE,
                    'spark_history_server_config': {
                        'dataproc_cluster': PHS_SERVER,
                    },
                },
            },      
    "runtime_config": { "version": "2.0",}