Search code examples
airflow

Dynamic trigger rule?


I have a DAG where I have 3 sensors and a data processing task that must run after the sensors have completed successfully.

Something like this:


sensorA -->
           \
sensorB -->-+- data processing
           /
sensorC -->

The twist is: the downstream task won't always need to wait on all 3 sensors - it might only need to wait on 1 or 2 of them. Which sensors to wait on will get decided outside of the DAG, based on user interactions.

I know I can use a PythonBranchOperator to read the external value which indicates which sensor(s) should be chosen, and I can use trigger rules to ensure that the data processing task will run after at least one sensor finishes correctly, but I realized that I can't tell the data processing step which sensors need to complete.

So now I'm thinking I need to do something more like this:

BranchOperator ->--+--> Sensor A ---------------------------->-+ 
                   |                                           |
                   +--> Sensor B ---------------------------->-+
                   |                                           |
                   +--> Sensor C ---------------------------->-+
                   |                                           |
                   +--> Sensor A --> Sensor B --------------->-+
                   |                                           |
                   +--> Sensor A --> Sensor C --------------->-+
                   |                                           |
                   +--> Sensor B --> Sensor C --------------->-+
                   |                                           |
                   +--> Sensor A --> Sensor B --> Sensor C -->-+--> data processing

And the data processing step has the trigger rule "none_failed_min_one_success".

But this feels rather clunky and awkward, and since I know there will be more than just these 3 sensors in the future, it's just going to get more cluttered.

Is there a more elegant way for the data processing step's trigger rule to be aware of which sensors it should wait for?


Solution

  • You actually figured out the right components to solve your issue but you misinterpret how branching works. There is no need to specify all possible combination. The branch operator just need to return the task_id(s) of the sensors required for running the others will be automatically skipped.

    Code template example:

    from airflow.utils.trigger_rule import TriggerRule
    
    with DAG(
        dag_id="sensors",
        start_date=datetime(2024, 1, 1),
        schedule=None,
        catchup=False
    
    ) as dag:
    
        @task.branch
        def branch_func():
            # Replace with logic how to choose the sensors you want to run
            return ["sensorA", "SensorB"]
    
        a = EmptyOperator(task_id="sensorA") # Replace with Sensor
        b = EmptyOperator(task_id="SensorB") # Replace with Sensor
        c = EmptyOperator(task_id="SensorC") # Replace with Sensor
    
        data_processing = EmptyOperator(
            task_id="data_processing",
            trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
        )
    
        branch_func() >> [a, b, c] >> data_processing
    

    This will give:

    enter image description here

    Which is expected. data_processing is eligible to run because sensorA and sensorB finished with success and sensorC is skipped.

    To simulate what happens in case of failure I manually set SensorA as failed and rerun data_processing:

    enter image description here

    as you can see in this case data_processing is set to upsream_failed status which means that the task can not run because upstream dependency does not met.

    In case that branch returns empty list (all sensors are to be skipped) then data_processing is also skipped.

    enter image description here