Search code examples
amazon-emrairflow

Using Json Input Variables In Airflow EMR Operator


I'm currently following the template given here: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py to create a DAG to call for a emr instance using spark submit. When setting up the spark_test_steps, I need to include variables passed in from a POST Json to fill the spark submit like below:

SPARK_TEST_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                '/usr/lib/spark/bin/run-example',
                'SparkPi',
                kwargs['dag_run'].conf['var_1']
                kwargs['dag_run'].conf['var_2']
                kwargs['dag_run'].conf['var_3']
                '10'
            ]
        }
    }
]

How can I pass in variables given by the POST Json while still following the format given in the git link to look like below?

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator \
    import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator \
    import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator \
    import EmrTerminateJobFlowOperator

DEFAULT_ARGS = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False
}

SPARK_TEST_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                '/usr/lib/spark/bin/run-example',
                'SparkPi',
                kwargs['dag_run'].conf['var_1']
                kwargs['dag_run'].conf['var_2']
                kwargs['dag_run'].conf['var_3']
                '10'
            ]
        }
    }
]

JOB_FLOW_OVERRIDES = {
    'Name': 'PiCalc'
}

dag = DAG(
    'emr_job_flow_manual_steps_dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 3 * * *'
)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_TEST_STEPS,
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

Solution

  • I was dealing with a similar issue yesterday and fixed it with a solution like this. If you have a file somewhere containing the JSON you should be able to use the Variables set through Admin->Variables.

    Use a PythonOperator to read in the JSON file and save it to a local variable, set it to an Airflow Variable using Variable.set("VARIABLE_NAME",JSON_CONTENTS_VARIABLE).

    Then you can set the Variable's JSON contents to a step in the EmrAddStepsOperator by calling steps=Variable.get("VARIABLE_NAME", deserialize_json=True)

    Hope this helps.