I have a DAG where I have 3 sensors and a data processing task that must run after the sensors have completed successfully.
Something like this:
sensorA --> \ sensorB -->-+- data processing / sensorC -->
The twist is: the downstream task won't always need to wait on all 3 sensors - it might only need to wait on 1 or 2 of them. Which sensors to wait on will get decided outside of the DAG, based on user interactions.
I know I can use a PythonBranchOperator to read the external value which indicates which sensor(s) should be chosen, and I can use trigger rules to ensure that the data processing task will run after at least one sensor finishes correctly, but I realized that I can't tell the data processing step which sensors need to complete.
So now I'm thinking I need to do something more like this:
BranchOperator ->--+--> Sensor A ---------------------------->-+ | | +--> Sensor B ---------------------------->-+ | | +--> Sensor C ---------------------------->-+ | | +--> Sensor A --> Sensor B --------------->-+ | | +--> Sensor A --> Sensor C --------------->-+ | | +--> Sensor B --> Sensor C --------------->-+ | | +--> Sensor A --> Sensor B --> Sensor C -->-+--> data processing
And the data processing step has the trigger rule "none_failed_min_one_success".
But this feels rather clunky and awkward, and since I know there will be more than just these 3 sensors in the future, it's just going to get more cluttered.
Is there a more elegant way for the data processing step's trigger rule to be aware of which sensors it should wait for?
You actually figured out the right components to solve your issue but you misinterpret how branching works. There is no need to specify all possible combination. The branch operator just need to return the task_id(s) of the sensors required for running the others will be automatically skipped.
Code template example:
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="sensors",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False
) as dag:
@task.branch
def branch_func():
# Replace with logic how to choose the sensors you want to run
return ["sensorA", "SensorB"]
a = EmptyOperator(task_id="sensorA") # Replace with Sensor
b = EmptyOperator(task_id="SensorB") # Replace with Sensor
c = EmptyOperator(task_id="SensorC") # Replace with Sensor
data_processing = EmptyOperator(
task_id="data_processing",
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
)
branch_func() >> [a, b, c] >> data_processing
This will give:
Which is expected. data_processing
is eligible to run because sensorA
and sensorB finished with success and sensorC
is skipped.
To simulate what happens in case of failure I manually set SensorA as failed and rerun data_processing
:
as you can see in this case data_processing
is set to upsream_failed
status which means that the task can not run because upstream dependency does not met.
In case that branch returns empty list (all sensors are to be skipped) then data_processing
is also skipped.