Search code examples
airflow

How do we trigger single airflow dag multipler times using TriggerDagRunOperator?


I have a Dag with schedule interval None. I want to trigger this Dag by TriggerDagRunOperator multiple times in a day.

I crated a PreDag with schedule_interval "* 1/12 * * *"
Inside PreDag a task of TriggerDagRunOperator runs that Trigger the main Dag.
As scheduled PreDag runs twice a day 1st time when PreDag runs it trigger the Dag but 2nd time when PreDag runs then task of triggerDagRunOperator show error : " A Dag Run already exists for dag id {{ dag_id}} at {{ execution_date}} with run id {{ trigger_run_id}}" `

trigger_run = TriggerDagRunOperator(
                task_id="main_dag_trigger",
                trigger_dag_id=str('DW_Test_TriggerDag'),  
                pool='branch_pool_limit', 
                wait_for_completion=True, 
                poke_interval=20, 
                trigger_run_id = 'trig__' + str(datetime.now()),
                execution_date = '{{ ds }}',
                # reset_dag_run = True ,
                dag = predag
            )`

Is it possible to Trigger a dag multiple times in a day using TriggerDagRunOperator.


Solution

  • Airflow uses execution_date and dag_id as ID for dag run table, so when the dag is triggered for the second time, there is a run with the same execution_date created in the first run.

    Why do you have this problem? that's because you are using {{ ds }} as execution_date for the run:

    The DAG run’s logical date as YYYY-MM-DD. Same as {{ dag_run.logical_date | ds }}.

    which is the date of your run and not the datetime, and the date of two runs triggered in the same day is the same.

    You can fix it by replacing {{ ds }} by {{ ts }}