I am trying to use Airflow to run 11 step on AWS EMR and following this code as reference. Since using EmrAddStepsOperator and EmrStepSensor for 11 steps would too much repetition. So I am trying to loop through it. I have used the below code in my DAG.
step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]
# @evalcontextfilter
# def dangerous_render(context, value):
# return Markup(Template(value).render(context)).render()
for i in range(0,len(steps)):
#Add step
step_adder.append(EmrAddStepsOperator(
task_id=steps[i],
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=eval('step_'+str(i+1)),
))
print(step_adder)
#Step Sensor for checking
step_checker.append(EmrStepSensor(
task_id=steps[i]+'_check',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
#step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
aws_conn_id='aws_default',
))
I am facing an error here, EmrStepSensor expects step_id from EMR to input here and that is being generated fetched from xcom(I guess, I am not 100% sure how this code works). But my step is stored in steps list so I can't give a static value here in task_id in step_id, like given in reference code and I am not able to figure out on how to use jinja template with python variable value to put values here from the steps list.
I used both of the below ways so that step_id can fetch the correct of step from EMR according to step name in steps[i]
step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")
However both of these failed with syntax error in Airflow. So if anyone can point me in right direction to do this, I would really appreciate that. I am using Airflow 1.10.12(This is the default version of Airflow in Managed Apache Airflow on AWS).
I'm not sure if this is already solved, so:
Using f-strings:
f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"
Using .format
:
"{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])
Note that you have to make sure that the value of key task_ids is wrapped with single quotes. Also, the return from xcom_pull is a list, therefore the index [0] at the end o