Search code examples
pythonairflowjinja2f-string

f strings in xcom pull giving jinja undefined error in Airflow


I'm trying to pass a f string to the conf parameter of TriggerDagRunOperator. These tasks are run in a for loop and the external_dag_id changes in each iteration.

I tried the following:

previous_task_id = 'get_config_for_dag_' + external_dag_id
conf=f"{{{{ ti.xcom_pull(task_ids={previous_task_id} , key='return_value') }}}}"

However I'm getting a jinja error :

jinja2.exceptions.UndefinedError: 'get_config_for_dag_bla_bla_sbx' is undefined

Does anyone see an error in the f string that I'm building?

Here is a more complete part of the code:


dag_info = ['bla_bla_sbx']
for external_dag_id in dag_info:

    get_config_for_dag = PythonOperator(
                task_id="get_config_for_dag_" + external_dag_id,
                python_callable=get_dict_value,
                op_kwargs={
                    'config_dict': "{{ ti.xcom_pull(task_ids='build_dags_configuration', key='return_value') }}",
                    'current_dag_id': external_dag_id
                }
            )


    previous_task_id = 'get_config_for_dag_' + external_dag_id        
    run_dag = TriggerDagRunOperator(
                task_id="run_dag_" + external_dag_id,
                wait_for_completion=True,
                trigger_dag_id=external_dag_id,
                pool="dag_pool",
                conf=f"{{{{ ti.xcom_pull(task_ids={previous_task_id} , key='return_value') }}}}",
                )



Solution

  • In the end it worked like this:

    f"{{{{ ti.xcom_pull(task_ids='get_config_for_dag_{external_dag_id}' , key='return_value') }}}}"