Search code examples
pythonairflowworkflow

Why was the 'python_callable' option removed from TriggerDagRunOperator in Airflow 2.0?


I can see that before Airflow 2.0, it was possible to use TriggerDagRunOperator with a condition by passing it a python_callable option:

def foo(context, dag_run_obj):
    if True:
        return dag_run_obj

dag = DAG(dag_id='test_trigger_dag_run_for_Sid',
          default_args={"owner" : "me",
                        "start_date":datetime.now()},
          schedule_interval='*/1 * * * *')

trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="simple_dummy_dag_v1",
                                python_callable=foo,
                                dag=dag)

But now this option has disappeared from the doc in the newest version. Why ? And how can I put a condition to the trigger without it ?


Solution

  • I don't know why that changed, but you can use BranchPythonOperator (very well explained here).

    You can do something like this:

    # Function definition
    def _branch_trigger_or_not():
        condition_to_trigger = True  # Change here to whatever you need
        if condition_to_trigger:
            return 'test_trigger_dagrun'
        else:
            return 'another_task_id'
    
    ### Other code here
    
    # Inside DAG context
    trigger_or_not = BranchPythonOperator(
        task_id='trigger_or_not',
        python_callable=_branch_trigger_or_not
    )
    
    # Dependencies definition
    trigger_or_not >> [trigger, another_task]