Search code examples
pythonairflow

How to correctly pair elements from two XCom lists in Airflow TriggerDagRunOperator.expand()?


I am using Apache Airflow and trying to trigger multiple DAGs from within another DAG using TriggerDagRunOperator.expand().

I have two lists being returned from an upstream task via XCom:

confs → A list of dictionaries containing conf parameters.

dags_to_trigger → A list of DAG IDs to be triggered.

Each list contains 5 elements, and I want to pair them one-to-one (i.e., the first element of confs should be used with the first element of dags_to_trigger, the second with the second, and so on).

Problem:

When I use expand() like this:

trigger_my_dags = TriggerDagRunOperator.partial(
    task_id="trigger_my_dags",
    wait_for_completion=False,
).expand(conf=confs, trigger_dag_id=dags_to_trigger)

Airflow cross-pairs the elements, triggering 25 DAG runs instead of 5 (it takes each conf from description_sources and pairs it with every DAG ID from dags_to_trigger, rather than pairing them by index).

Question:

How can I ensure that Airflow correctly pairs the elements from both lists so that each DAG run gets the corresponding conf from description_sources?

Would restructuring the data into a single list of dictionaries like this help?

[{"conf": { ... }, "trigger_dag_id": "dag_1"}, {"conf": { ... }, "trigger_dag_id": "dag_2"}, ...]

If so, how do I correctly use expand() with this structure?

Thanks in advance!


Solution

  • Don't know your Airflow version but hope this helps. You can use execute() method instead of partial() + expand(). Here is an example:

    from datetime import datetime
    from typing import Any
    
    from airflow import DAG
    from airflow.models import BaseOperator
    from airflow.operators.empty import EmptyOperator
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    from airflow.utils.context import Context
    
    
    class RunnerOperator(BaseOperator):
        def execute(self, context: Context) -> Any:
            #  params for DAG run. from XCOM / API / doesn't matter
            params = {
                'configs': [
                    {'conf': {'param_a': 1}, 'trigger_dag_id': 'dag_1'},
                    {'conf': {'param_b': 2}, 'trigger_dag_id': 'dag_2'},
                ]
            }
    
            for dag_params in params['configs']:
                trigger_dag_id = dag_params['trigger_dag_id']
                TriggerDagRunOperator(
                    task_id=f'{trigger_dag_id}_task',
                    trigger_dag_id=trigger_dag_id,
                    conf=dag_params['conf'],
                    wait_for_completion=False,
                ).execute(context)
    
    
    dag = DAG(dag_id='igor_atsberger', start_date=datetime(2025, 1, 1), schedule=None)
    RunnerOperator(dag=dag, task_id='runner')
    
    dag_1 = DAG(dag_id='dag_1', start_date=datetime(2025, 1, 1), schedule=None)
    dag_2 = DAG(dag_id='dag_2', start_date=datetime(2025, 1, 1), schedule=None)
    for _dag in [dag_1, dag_2]:
        EmptyOperator(dag=_dag, task_id=f'{_dag.dag_id}_task')
    

    Let's check dag_1 config:

    dag 1

    Let's check dag_2 config:

    dag 2

    Another easy way is just to use Airflow REST API