Assuming we have the two following Airflow tasks in a DAG,
from airflow.operators.dummy import DummyOperator
t1 = DummyOperator(task_id='dummy_1')
t2 = DummyOperator(task_id='dummy_2')
we can specify dependencies as:
# Option A
t1 >> t2
# Option B
t2.set_upstream(t1)
# Option C
t1.set_downstream(t2)
My question is whether there is any functionality that lets you remove downstream and/or upstream dependencies once they are defined.
I have a fairly big DAG where most of the tasks (and their dependencies) are generated dynamically. Once the tasks are created, I would like to re-arrange some of the dependencies and/or introduce some new tasks.
For example, assuming that the functionality implements the following logic
from airflow.operators.dummy import DummyOperator
t1 = DummyOperator(task_id='dummy_1')
t2 = DummyOperator(task_id='dummy_2')
t1 >> t2
I would like to then be able to add a new task, add it in between the two tasks, and then remove the old dependency between t1
and t2
. Is this possible?
from airflow import DAG
from airflow.operators.dummy import DummyOperator
def function_that_creates_dags_dynamically():
tasks = {
't1': DummyOperator(task_id='dummy_1'),
't2': DummyOperator(task_id='dummy_2'),
}
tasks['t1'] >> tasks['t2']
return tasks
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
tasks = function_that_creates_dags_dynamically()
t3 = DummyOperator(task_id='dummy_3')
tasks[t1] >> t3
t3 >> tasks[t2]
# Somehow remove tasks[t1] >> tasks[t2]
Technically, you can remove an existing dependency like so:
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
t3 = EmptyOperator(task_id="t3")
t1 >> t2
t1 >> t3 >> t2
t1.downstream_task_ids.remove("t2")
This results in only the dependency t1 >> t3 >> t2
:
Each task internally stores the dependencies in sets upstream_task_ids
and downstream_task_ids
, which you can manipulate. However, it feels like a workaround to me and I'd advise generating only the correct dependencies in the first place if possible.