I can see that before Airflow 2.0, it was possible to use TriggerDagRunOperator with a condition by passing it a python_callable
option:
def foo(context, dag_run_obj):
if True:
return dag_run_obj
dag = DAG(dag_id='test_trigger_dag_run_for_Sid',
default_args={"owner" : "me",
"start_date":datetime.now()},
schedule_interval='*/1 * * * *')
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
trigger_dag_id="simple_dummy_dag_v1",
python_callable=foo,
dag=dag)
But now this option has disappeared from the doc in the newest version. Why ? And how can I put a condition to the trigger without it ?
I don't know why that changed, but you can use BranchPythonOperator
(very well explained here).
You can do something like this:
# Function definition
def _branch_trigger_or_not():
condition_to_trigger = True # Change here to whatever you need
if condition_to_trigger:
return 'test_trigger_dagrun'
else:
return 'another_task_id'
### Other code here
# Inside DAG context
trigger_or_not = BranchPythonOperator(
task_id='trigger_or_not',
python_callable=_branch_trigger_or_not
)
# Dependencies definition
trigger_or_not >> [trigger, another_task]