Search code examples
pythonamazon-web-servicesairflowamazon-emrmwaa

Can't use python variable in jinja template with Airflow


I am trying to use Airflow to run 11 step on AWS EMR and following this code as reference. Since using EmrAddStepsOperator and EmrStepSensor for 11 steps would too much repetition. So I am trying to loop through it. I have used the below code in my DAG.

step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=eval('step_'+str(i+1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        #step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
        step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
        aws_conn_id='aws_default',
    ))

I am facing an error here, EmrStepSensor expects step_id from EMR to input here and that is being generated fetched from xcom(I guess, I am not 100% sure how this code works). But my step is stored in steps list so I can't give a static value here in task_id in step_id, like given in reference code and I am not able to figure out on how to use jinja template with python variable value to put values here from the steps list.

I used both of the below ways so that step_id can fetch the correct of step from EMR according to step name in steps[i]

step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",

step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")

However both of these failed with syntax error in Airflow. So if anyone can point me in right direction to do this, I would really appreciate that. I am using Airflow 1.10.12(This is the default version of Airflow in Managed Apache Airflow on AWS).


Solution

  • I'm not sure if this is already solved, so:

    Using f-strings:

    f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"

    Using .format: "{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])

    Note that you have to make sure that the value of key task_ids is wrapped with single quotes. Also, the return from xcom_pull is a list, therefore the index [0] at the end o