I want to generate multiple Airflow sensors/operators in a loop, but I want to be able to access them one-by-one, as they have different dependencies. For example, task1
has a dependency for operator1
, operator2
, and operator3
; while task2
has a dependency for operator4
, operator1
, and operator3
, etc. Some of the differences overlap, but most of them are different.
Take a look at the sample graph I made.
I tried generating them in a loop, and assign the dependencies at the end of my DAG, but I got an import error from Airflow, saying it can't see the variable I referenced at the end of my file.
for table_script_id in [1,2,3,4]:
table_sensor = PythonSensor(
task_id=f"table_sensor_{table_script_id}",
python_callable=hello_world,
)
# at the end of the DAG; got an import error this way
table_sensor_1 >> something_else1
table_sensor_2 >> [something_else2, something_else3]
But it didn't go as I hoped so, as all the sensors get generated into the table_sensor
variable, and overwrite each other as I understand.
Does anyone have an idea for an optimal way to do this? Has anyone done something like this? I'm sure! But after two days of Googling, I could not find any examples where dynamically generated tasks get different dependencies...
I don't know exactly what you are trying to do but you can store tasks in some dict and apply dependency something like below
task_map = {}
for table_script_id in [1, 2, 3, 4]:
table_sensor = PythonSensor(
task_id=f"table_sensor_{table_script_id}",
python_callable=hello_world,
)
task_map[f"table_sensor_{table_script_id}"] = table_sensor
# at the end of the DAG; got an import error this way
task_map.get("table_sensor_1") >> something_else1
task_map.get("table_sensor_2") >> [something_else2, something_else3]