In Airflow, I'm facing the issue that I need to pass the job_flow_id
to one of my emr-steps. I am capable of retrieving the job_flow_id
from the operator but when I am going to create the steps to submit to the cluster, the task_instance
value is not right.
I have the following code:
def issue_step(name, args):
return [
{
"Name": name,
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "s3://....",
"Args": args
}
}
]
dag = DAG('example',
description='My dag',
schedule_interval='0 8 * * 6',
dagrun_timeout=timedelta(days=2))
try:
create_emr = EmrCreateJobFlowOperator(
task_id='create_job_flow',
aws_conn_id='aws_default',
dag=dag
)
load_data_steps = issue_step('load', ['arg1', 'arg2'])
load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id')
load_data_steps[0]["HadoopJarStep"]["Args"].append(
"{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id
load_data = EmrAddStepsOperator(
task_id='load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others
aws_conn_id='aws_default',
steps=load_data_steps,
dag=dag
)
check_load_data = EmrStepSensor(
task_id='watch_load_data',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
create_emr_recommendations >> load_data
load_data >> check_load_data
check_load_data >> cluster_remover
except AirflowException as ae:
print ae.message
The problem is that, when I check the EMR, instead of seeing the --cluster-id j-1234
in the load_data
step, I see --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}"
, which causes my step to fail.
How can I get the actual value inside my step function ?
Thanks and happy holidays
I found out that there is PR on airflow repository about this. The issue is that there is no templating for steps in the EmrAddStepsOperator
. To overcome this issue, I did the following:
EmrAddStepsOperator
Here the code for the custom operator and the plugin in file custom_emr_add_step_operator.py
(see tree below)
from __future__ import division, absolute_import, print_function
from airflow.plugins_manager import AirflowPlugin
from airflow.utils import apply_defaults
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
class CustomEmrAddStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above
@apply_defaults
def __init__(
self,
*args, **kwargs):
super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs)
def execute(self, context):
super(CustomEmrAddStepsOperator, self).execute(context=context)
# Defining the plugin class
class CustomPlugin(AirflowPlugin):
name = "custom_plugin"
operators = [CustomEmrAddStepsOperator]
In my DAG file I called the plugin in this way
from airflow.operators import CustomEmrAddStepsOperator
The structure of my project and plugins looks like this:
├── config
│ └── airflow.cfg
├── dags
│ ├── __init__.py
│ └── my_dag.py
├── plugins
│ ├── __init__.py
│ └── operators
│ ├── __init__.py
│ └── custom_emr_add_step_operator.py
└── requirements.txt
If you are using an IDE such as PyCharm, this will complain because it says that it cannot find the module. But when you run Airflow, this problem will not appear.
Remember also to make sure that in your airflow.cfg
you are going to point to the right plugins
folder so that Airflow is able to read your newly created plugin.