Search code examples
pythonemrairflow

Airflow - Task Instance in EMR operator


In Airflow, I'm facing the issue that I need to pass the job_flow_id to one of my emr-steps. I am capable of retrieving the job_flow_id from the operator but when I am going to create the steps to submit to the cluster, the task_instance value is not right. I have the following code:

def issue_step(name, args):
    return [
        {
            "Name": name,
            "ActionOnFailure": "CONTINUE",
            "HadoopJarStep": {
                "Jar": "s3://....",
                "Args": args
            }
        }
    ]

dag = DAG('example',
          description='My dag',
          schedule_interval='0 8 * * 6',
          dagrun_timeout=timedelta(days=2))

try:

    create_emr = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        aws_conn_id='aws_default',        
        dag=dag
    )

    load_data_steps = issue_step('load', ['arg1', 'arg2'])

    load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
    load_data_steps[0]["HadoopJarStep"]["Args"].append(
        "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id

    load_data = EmrAddStepsOperator(
        task_id='load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",  # this is correctly exchanged with the job_flow_id - same for the others
        aws_conn_id='aws_default',
        steps=load_data_steps,
        dag=dag
    )

    check_load_data = EmrStepSensor(
        task_id='watch_load_data',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        dag=dag
    )

    create_emr_recommendations >> load_data
    load_data >> check_load_data
    check_load_data >> cluster_remover

except AirflowException as ae:
    print ae.message

The problem is that, when I check the EMR, instead of seeing the --cluster-id j-1234 in the load_data step, I see --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}", which causes my step to fail.

How can I get the actual value inside my step function ?

Thanks and happy holidays


Solution

  • I found out that there is PR on airflow repository about this. The issue is that there is no templating for steps in the EmrAddStepsOperator. To overcome this issue, I did the following:

    • Created a custom operator that inherits from EmrAddStepsOperator
    • Added this operator as Plugin
    • Called the newly operator in my DAG file

    Here the code for the custom operator and the plugin in file custom_emr_add_step_operator.py (see tree below)

    from __future__ import division, absolute_import, print_function
    
    from airflow.plugins_manager import AirflowPlugin
    from airflow.utils import apply_defaults
    
    from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
    
    
    class CustomEmrAddStepsOperator(EmrAddStepsOperator):
        template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above
    
        @apply_defaults
        def __init__(
                self,
                *args, **kwargs):
            super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)
    
        def execute(self, context):
            super(CustomEmrAddStepsOperator, self).execute(context=context)
    
    
    # Defining the plugin class
    class CustomPlugin(AirflowPlugin):
        name = "custom_plugin"
        operators = [CustomEmrAddStepsOperator]
    

    In my DAG file I called the plugin in this way

    from airflow.operators import CustomEmrAddStepsOperator
    

    The structure of my project and plugins looks like this:

    ├── config
    │   └── airflow.cfg
    ├── dags
    │   ├── __init__.py
    │   └── my_dag.py
    ├── plugins
    │   ├── __init__.py
    │   └── operators
    │       ├── __init__.py
    │       └── custom_emr_add_step_operator.py
    └── requirements.txt
    

    If you are using an IDE such as PyCharm, this will complain because it says that it cannot find the module. But when you run Airflow, this problem will not appear. Remember also to make sure that in your airflow.cfg you are going to point to the right plugins folder so that Airflow is able to read your newly created plugin.