Search code examples
pythonairflowdirected-acyclic-graphsairflow-taskflow

Create Dynamic Airflow tasks with separate dependencies


I want to generate multiple Airflow sensors/operators in a loop, but I want to be able to access them one-by-one, as they have different dependencies. For example, task1 has a dependency for operator1, operator2, and operator3; while task2 has a dependency for operator4, operator1, and operator3, etc. Some of the differences overlap, but most of them are different.

Take a look at the sample graph I made.
Take a look at the sample graph I made.

I tried generating them in a loop, and assign the dependencies at the end of my DAG, but I got an import error from Airflow, saying it can't see the variable I referenced at the end of my file.

I tried something along the lines of this

for table_script_id in [1,2,3,4]:
    table_sensor = PythonSensor(
        task_id=f"table_sensor_{table_script_id}",
        python_callable=hello_world,
    )
    
    

# at the end of the DAG; got an import error this way
table_sensor_1 >> something_else1
table_sensor_2 >> [something_else2, something_else3]

But it didn't go as I hoped so, as all the sensors get generated into the table_sensor variable, and overwrite each other as I understand.

Does anyone have an idea for an optimal way to do this? Has anyone done something like this? I'm sure! But after two days of Googling, I could not find any examples where dynamically generated tasks get different dependencies...


Solution

  • I don't know exactly what you are trying to do but you can store tasks in some dict and apply dependency something like below

        task_map = {}
        for table_script_id in [1, 2, 3, 4]:
            table_sensor = PythonSensor(
                task_id=f"table_sensor_{table_script_id}",
                python_callable=hello_world,
            )
            task_map[f"table_sensor_{table_script_id}"] = table_sensor
    
        # at the end of the DAG; got an import error this way
        task_map.get("table_sensor_1") >> something_else1
        task_map.get("table_sensor_2") >> [something_else2, something_else3]