I'm trying to pass a f string to the conf parameter of TriggerDagRunOperator. These tasks are run in a for loop and the external_dag_id changes in each iteration.
I tried the following:
previous_task_id = 'get_config_for_dag_' + external_dag_id
conf=f"{{{{ ti.xcom_pull(task_ids={previous_task_id} , key='return_value') }}}}"
However I'm getting a jinja error :
jinja2.exceptions.UndefinedError: 'get_config_for_dag_bla_bla_sbx' is undefined
Does anyone see an error in the f string that I'm building?
Here is a more complete part of the code:
dag_info = ['bla_bla_sbx']
for external_dag_id in dag_info:
get_config_for_dag = PythonOperator(
task_id="get_config_for_dag_" + external_dag_id,
python_callable=get_dict_value,
op_kwargs={
'config_dict': "{{ ti.xcom_pull(task_ids='build_dags_configuration', key='return_value') }}",
'current_dag_id': external_dag_id
}
)
previous_task_id = 'get_config_for_dag_' + external_dag_id
run_dag = TriggerDagRunOperator(
task_id="run_dag_" + external_dag_id,
wait_for_completion=True,
trigger_dag_id=external_dag_id,
pool="dag_pool",
conf=f"{{{{ ti.xcom_pull(task_ids={previous_task_id} , key='return_value') }}}}",
)
In the end it worked like this:
f"{{{{ ti.xcom_pull(task_ids='get_config_for_dag_{external_dag_id}' , key='return_value') }}}}"