I am using Apache Airflow and trying to trigger multiple DAGs from within another DAG using TriggerDagRunOperator.expand()
.
I have two lists being returned from an upstream task via XCom:
confs → A list of dictionaries containing conf parameters.
dags_to_trigger → A list of DAG IDs to be triggered.
Each list contains 5 elements, and I want to pair them one-to-one (i.e., the first element of confs should be used with the first element of dags_to_trigger, the second with the second, and so on).
Problem:
When I use expand() like this:
trigger_my_dags = TriggerDagRunOperator.partial(
task_id="trigger_my_dags",
wait_for_completion=False,
).expand(conf=confs, trigger_dag_id=dags_to_trigger)
Airflow cross-pairs the elements, triggering 25 DAG runs instead of 5 (it takes each conf from description_sources and pairs it with every DAG ID from dags_to_trigger, rather than pairing them by index).
Question:
How can I ensure that Airflow correctly pairs the elements from both lists so that each DAG run gets the corresponding conf from description_sources?
Would restructuring the data into a single list of dictionaries like this help?
[{"conf": { ... }, "trigger_dag_id": "dag_1"}, {"conf": { ... }, "trigger_dag_id": "dag_2"}, ...]
If so, how do I correctly use expand() with this structure?
Thanks in advance!
Don't know your Airflow
version but hope this helps. You can use execute()
method instead of partial() + expand()
. Here is an example:
from datetime import datetime
from typing import Any
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.context import Context
class RunnerOperator(BaseOperator):
def execute(self, context: Context) -> Any:
# params for DAG run. from XCOM / API / doesn't matter
params = {
'configs': [
{'conf': {'param_a': 1}, 'trigger_dag_id': 'dag_1'},
{'conf': {'param_b': 2}, 'trigger_dag_id': 'dag_2'},
]
}
for dag_params in params['configs']:
trigger_dag_id = dag_params['trigger_dag_id']
TriggerDagRunOperator(
task_id=f'{trigger_dag_id}_task',
trigger_dag_id=trigger_dag_id,
conf=dag_params['conf'],
wait_for_completion=False,
).execute(context)
dag = DAG(dag_id='igor_atsberger', start_date=datetime(2025, 1, 1), schedule=None)
RunnerOperator(dag=dag, task_id='runner')
dag_1 = DAG(dag_id='dag_1', start_date=datetime(2025, 1, 1), schedule=None)
dag_2 = DAG(dag_id='dag_2', start_date=datetime(2025, 1, 1), schedule=None)
for _dag in [dag_1, dag_2]:
EmptyOperator(dag=_dag, task_id=f'{_dag.dag_id}_task')
Let's check dag_1
config:
Let's check dag_2
config:
Another easy way is just to use Airflow REST API