Search code examples
directed-acyclic-graphsairflow

How do we trigger multiple airflow dags using TriggerDagRunOperator?


I have a scenario wherein a particular dag upon completion needs to trigger multiple dags,have used TriggerDagRunOperator to trigger single dag,is it possible to pass multiple dags to the TriggerDagRunOperator to trigger multiple dags?

And is it possible to trigger only upon successful completion of the current dag.


Solution

  • I have faced the same problem. And there is no solution out of the box, but we can write a custom operator for it.

    So here the code of a custom operator, that get python_callable and trigger_dag_id as arguments:

    class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    
        @apply_defaults
        def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
            super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
            self.op_args = op_args or []
            self.op_kwargs = op_kwargs or {}
    
        def execute(self, context):
            session = settings.Session()
            created = False
            for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
                if not dro or not isinstance(dro, DagRunOrder):
                    break
    
                if dro.run_id is None:
                    dro.run_id = 'trig__' + datetime.utcnow().isoformat()
    
                dbag = DagBag(settings.DAGS_FOLDER)
                trigger_dag = dbag.get_dag(self.trigger_dag_id)
                dr = trigger_dag.create_dagrun(
                    run_id=dro.run_id,
                    state=State.RUNNING,
                    conf=dro.payload,
                    external_trigger=True
                )
                created = True
                self.log.info("Creating DagRun %s", dr)
    
            if created is True:
                session.commit()
            else:
                self.log.info("No DagRun created")
            session.close()
    

    trigger_dag_id is dag id what we want running multiple times.

    python_callable is a function, it should return a list of DagRunOrder objects, one object for schedule one instance of DAG with dag_id trigger_dag_id.

    Code and examples on GitHub: https://github.com/mastak/airflow_multi_dagrun Little bit more description about this code: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13