Search code examples
pythonairflowairflow-2.x

How to remove a downstream or upstream task dependency in Airflow


Assuming we have the two following Airflow tasks in a DAG,

from airflow.operators.dummy import DummyOperator

t1 = DummyOperator(task_id='dummy_1')
t2 = DummyOperator(task_id='dummy_2')

we can specify dependencies as:

# Option A
t1 >> t2

# Option B
t2.set_upstream(t1)

# Option C
t1.set_downstream(t2)

My question is whether there is any functionality that lets you remove downstream and/or upstream dependencies once they are defined.

I have a fairly big DAG where most of the tasks (and their dependencies) are generated dynamically. Once the tasks are created, I would like to re-arrange some of the dependencies and/or introduce some new tasks.

For example, assuming that the functionality implements the following logic

from airflow.operators.dummy import DummyOperator


t1 = DummyOperator(task_id='dummy_1')
t2 = DummyOperator(task_id='dummy_2')

t1 >> t2

I would like to then be able to add a new task, add it in between the two tasks, and then remove the old dependency between t1 and t2. Is this possible?

from airflow import DAG
from airflow.operators.dummy import DummyOperator

def function_that_creates_dags_dynamically():
    tasks = {
        't1': DummyOperator(task_id='dummy_1'),
        't2': DummyOperator(task_id='dummy_2'),

    }
    tasks['t1'] >> tasks['t2']
    return tasks

with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    tasks = function_that_creates_dags_dynamically()

    t3 = DummyOperator(task_id='dummy_3')
    tasks[t1] >> t3
    t3 >> tasks[t2] 
   # Somehow remove tasks[t1] >> tasks[t2]

Solution

  • Technically, you can remove an existing dependency like so:

    t1 = EmptyOperator(task_id="t1")
    t2 = EmptyOperator(task_id="t2")
    t3 = EmptyOperator(task_id="t3")
    
    t1 >> t2
    t1 >> t3 >> t2
    
    t1.downstream_task_ids.remove("t2")
    

    This results in only the dependency t1 >> t3 >> t2:

    enter image description here

    Each task internally stores the dependencies in sets upstream_task_ids and downstream_task_ids, which you can manipulate. However, it feels like a workaround to me and I'd advise generating only the correct dependencies in the first place if possible.