Hello I am trying to set conditional dependency in Airflow, in the below flow my objective is to run print-conf-success
only after successful execution of print-conf-1
and print-conf-2
and print-conf-failure
in either of them fails. In the below dependency I setup upstream as a list of [print-conf-2, print-conf-1]
expecting it to have both the task as upstream however instead of having both the tasks as upstream its coming as downstream for each of them. What is the correct way to set dependency for having both having success status for [print-conf-2, print-conf-1]
for task print-conf-success
and failure for either of them for task print-conf-failure
"""Example DAG demonstrating the usage of the PythonOperator."""
import time
from pprint import pprint
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
DEFAULT_ARGS = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2022, 5, 20, 0),
'retries': 2
}
def print_log(**kwargs):
print("--------------------")
print("1, 2, 3")
print("--------------------")
def print_log_failed(**kwargs):
print("--------------------")
print("1, 2, 3, failed")
print("--------------------")
with DAG(dag_id="test_dag", schedule_interval=None, default_args=DEFAULT_ARGS, max_active_runs=10) as dag:
log_conf = PythonOperator(
task_id='print-conf-success',
provide_context=True,
python_callable=print_log,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
log_conf_failure = PythonOperator(
task_id='print-conf-failure',
provide_context=True,
python_callable=print_log,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
log_conf_1 = PythonOperator(
task_id='print-conf-1',
provide_context=True,
python_callable=print_log,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
log_conf_2 = PythonOperator(
task_id='print-conf-2',
provide_context=True,
python_callable=print_log,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
log_conf_3 = PythonOperator(
task_id='print-conf-3',
provide_context=True,
python_callable=print_log_failed,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
log_conf.set_upstream([log_conf_1, log_conf_2])
log_conf_failure.set_upstream([log_conf_1, log_conf_2])
log_conf_3 >> ([log_conf_1, log_conf_2])
I think this is what you are after:
print-conf-1
, print-conf-2
, print-conf-3
can be successful or fail (for demonstration in the code below print-conf-3
will always fail).
print-conf-failure
will be executed only if at least 1 upstream task has failed.
print-conf-failure
will be executed only if all upstream tasks are successful.
code:
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow import DAG, AirflowException
from airflow.operators.python import PythonOperator
DEFAULT_ARGS = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2022, 5, 20, 0),
'retries': 2
}
def print_log(**kwargs):
print("--------------------")
print("1, 2, 3")
print("--------------------")
def print_log_failed(**kwargs):
print("--------------------")
print("1, 2, 3, failed")
print("--------------------")
raise AirflowException("failing")
with DAG(dag_id="example_test_dag", schedule_interval=None, default_args=DEFAULT_ARGS, max_active_runs=10) as dag:
log_conf = PythonOperator(
task_id='print-conf-success',
provide_context=True, # Remove this line if you are on Airflow 2
python_callable=print_log)
log_conf_failure = PythonOperator(
task_id='print-conf-failure',
provide_context=True, # Remove this line if you are on Airflow 2
python_callable=print_log,
trigger_rule=TriggerRule.ONE_FAILED)
log_conf_1 = PythonOperator(
task_id='print-conf-1',
provide_context=True, # Remove this line if you are on Airflow 2
python_callable=print_log)
log_conf_2 = PythonOperator(
task_id='print-conf-2',
provide_context=True, # Remove this line if you are on Airflow 2
python_callable=print_log)
log_conf_3 = PythonOperator(
task_id='print-conf-3',
provide_context=True, # Remove this line if you are on Airflow 2
python_callable=print_log_failed)
[log_conf_1, log_conf_2] >> log_conf
[log_conf_1, log_conf_2, log_conf_3] >> log_conf_failure