Search code examples
pythongoogle-cloud-platformairflowgoogle-cloud-dataproc

Component Gateway with DataprocOperator on Airflow


In GCP it is fairly simply to install and run a JupyterHub component from the UI or the gcloud command. I'm trying to script the processus through Airflow and the DataprocClusterCreateOperator, here an extract of the DAG

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        optional_components=['JUPYTER', 'ANACONDA']
    )

However I can not managed to specifify the needed enable-component-gateway parameter. Looking at the source code, it seems the parameters is not intended (both in the deprecated or the last stable operator).

I know the REST API provides the endpointConfig.enableHttpPortAccess, but I would rather use the official operator. Does anyone has an idea how to achieved that?


Solution

  • Edit, a fix fit for composer-1.8.3 with airflow-1.10.3

    In Airflow 1.10.3, the cluster configuration cannot be externally created. However we can inherit the cluster creation operator and override the configuration creation. This will also let us to set the optional components, a parameter missing from this Airflow's version.

    class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):
    
        def __init__(self, *args, **kwargs):
            super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)
    
        def _build_cluster_data(self):
            cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
            cluster_data['config']['endpointConfig'] = {
                'enableHttpPortAccess': True
            }
            cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
            return cluster_data
    
    #Start DataProc Cluster
    dataproc_cluster = CustomDataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        dag=dag
    )
    

    Original answer, for Airflow 1.10.7

    While not optimal, you can create the Cluster data structure yourself instead of having Airflow's ClusterGenerator to do that. It should work on the latest version (1.10.7)

    cluster = {
      'clusterName': CLUSTER_NAME,
      'config': {
        'gceClusterConfig': {
          'zoneUri': 'europe-west4-a'
        },
        'masterConfig': {
          'numInstances': 1,
          'machineTypeUri': 'n1-standard-2',
          'diskConfig': {
            'bootDiskSizeGb': 100
          },
        },
        'workerConfig': {
          'numInstances': 3,
          'machineTypeUri': 'n1-standard-2',
          'diskConfig': {
            'bootDiskSizeGb': 100
          },
        },
        'softwareConfig': {
          'optionalComponents': [
            'ANACONDA',
            'JUPYTER'
          ]
        },
        'lifestyleConfig': {
          'autoDeleteTtl': 21600
        },
        'endpointConfig': {
          'enableHttpPortAccess': True
        }
      },
      'projectId': PROJECT_ID
    }
    #Start DataProc Cluster
    dataproc_cluster = DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        region='europe-west4', 
        zone='europe-west4-a',
        cluster = cluster,
        dag=DAG
    )
    

    In case you are using another Airflow version, please specify that.

    You can also vote for the bug I've opened: AIRFLOW-6432