Search code examples
airflowairflow-scheduler

Airflow setting conditional dependency


Hello I am trying to set conditional dependency in Airflow, in the below flow my objective is to run print-conf-success only after successful execution of print-conf-1 and print-conf-2 and print-conf-failure in either of them fails. In the below dependency I setup upstream as a list of [print-conf-2, print-conf-1] expecting it to have both the task as upstream however instead of having both the tasks as upstream its coming as downstream for each of them. What is the correct way to set dependency for having both having success status for [print-conf-2, print-conf-1] for task print-conf-success and failure for either of them for task print-conf-failure

"""Example DAG demonstrating the usage of the PythonOperator."""
import time
from pprint import pprint
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator

DEFAULT_ARGS = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': datetime(2022, 5, 20, 0),
    'retries': 2
}


def print_log(**kwargs):
    print("--------------------")
    print("1, 2, 3")
    print("--------------------")


def print_log_failed(**kwargs):
    print("--------------------")
    print("1, 2, 3, failed")
    print("--------------------")


with DAG(dag_id="test_dag", schedule_interval=None, default_args=DEFAULT_ARGS, max_active_runs=10) as dag:
    log_conf = PythonOperator(
        task_id='print-conf-success',
        provide_context=True,
        python_callable=print_log,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        dag=dag)

    log_conf_failure = PythonOperator(
        task_id='print-conf-failure',
        provide_context=True,
        python_callable=print_log,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        dag=dag)

    log_conf_1 = PythonOperator(
        task_id='print-conf-1',
        provide_context=True,
        python_callable=print_log,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        dag=dag)

    log_conf_2 = PythonOperator(
        task_id='print-conf-2',
        provide_context=True,
        python_callable=print_log,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        dag=dag)

    log_conf_3 = PythonOperator(
        task_id='print-conf-3',
        provide_context=True,
        python_callable=print_log_failed,
        trigger_rule=TriggerRule.ONE_FAILED,
        dag=dag)

    log_conf.set_upstream([log_conf_1, log_conf_2])
    log_conf_failure.set_upstream([log_conf_1, log_conf_2])

    log_conf_3 >> ([log_conf_1, log_conf_2])

dag flow


Solution

  • I think this is what you are after:

    enter image description here

    print-conf-1, print-conf-2, print-conf-3 can be successful or fail (for demonstration in the code below print-conf-3 will always fail).

    print-conf-failure will be executed only if at least 1 upstream task has failed.

    print-conf-failure will be executed only if all upstream tasks are successful.

    code:

    from datetime import datetime
    from airflow.utils.trigger_rule import TriggerRule
    from airflow import DAG, AirflowException
    from airflow.operators.python import PythonOperator
    
    DEFAULT_ARGS = {
        'owner': 'admin',
        'depends_on_past': False,
        'start_date': datetime(2022, 5, 20, 0),
        'retries': 2
    }
    
    
    def print_log(**kwargs):
        print("--------------------")
        print("1, 2, 3")
        print("--------------------")
    
    
    def print_log_failed(**kwargs):
        print("--------------------")
        print("1, 2, 3, failed")
        print("--------------------")
        raise AirflowException("failing")
    
    
    with DAG(dag_id="example_test_dag", schedule_interval=None, default_args=DEFAULT_ARGS, max_active_runs=10) as dag:
        log_conf = PythonOperator(
            task_id='print-conf-success',
            provide_context=True, # Remove this line if you are on Airflow 2
            python_callable=print_log)
    
        log_conf_failure = PythonOperator(
            task_id='print-conf-failure',
            provide_context=True, # Remove this line if you are on Airflow 2
            python_callable=print_log,
            trigger_rule=TriggerRule.ONE_FAILED)
    
        log_conf_1 = PythonOperator(
            task_id='print-conf-1',
            provide_context=True, # Remove this line if you are on Airflow 2
            python_callable=print_log)
    
        log_conf_2 = PythonOperator(
            task_id='print-conf-2',
            provide_context=True, # Remove this line if you are on Airflow 2
            python_callable=print_log)
    
        log_conf_3 = PythonOperator(
            task_id='print-conf-3',
            provide_context=True, # Remove this line if you are on Airflow 2
            python_callable=print_log_failed)
    
        [log_conf_1, log_conf_2] >> log_conf
        [log_conf_1, log_conf_2, log_conf_3] >> log_conf_failure