My code look like this:
def etl():
for item in ['FIRST','SECCOND','THIRD']:
if item == 'a':
requests = ['Data1','Data3']
elif item == 'b':
requests = ['Data1']
for data_name in requests:
@task(task_id=f'{item}_{data_name}_task_a')
def taska():
a,b = some_func
vars_dict = {'a': a,
'b': b}
return vars_dict
@task(task_id=f'{account}_{data_name}_get_liveops_data')
def taskb(vars_dict):
some_other_func
return True
if data_name=='Data1':
@task(task_id='last_task')
def last_task(success):
dim_experiments.main()
return
vars_dict = taska()
success = taskb(vars_dict)
last_task(success)
myc_dag = etl()
The goals is to have last_task
dependent of taska
and taskb
except for that taska
and taskb
that download Data3
Requests. I am not able to achieve it using TaskFlow API
The parallel dependency is occurring because calling the last_task()
TaskFlow function and setting the task dependency to it (implicitly via the TaskFlow API) is done within the same loop which calls the other tasks. Each call of a TaskFlow function will create a new task node. If last_task
was pulled outside the loops and only the necessary dependencies were set inside the loops, you would achieve the desired structure.
Let's take a simplified version of your code as an example.
from datetime import datetime
from airflow.decorators import dag, task
@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
@task(task_id="last_task")
def last_task(some_input=None):
...
for item in ["a", "b"]:
@task
def taska():
return {"a": "A", "b": "B"}
@task
def taskb(input):
...
success = taskb(taska())
last_task(success)
myc_dag = etl()
In the DAG above, taska()
, taskb()
, and last_task()
TaskFlow functions are all called and their task dependencies set within the loop. So, we see 2 parallel paths:
To have last_task()
become a shared downstream task to both paths, we need to pull the call to last_task()
(meaning that we only create a task node once) but keep the task dependency between taskb()
and last_task()
intact. This can be done with a small refactor of the example:
@dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
def etl():
@task(task_id="last_task")
def last_task(some_input=None):
...
last_task = last_task()
for item in ["a", "b"]:
@task
def taska():
return {"a": "A", "b": "B"}
@task
def taskb(input):
...
success = taskb(taska())
success >> last_task
myc_dag = etl()
Notice that the last_task()
TaskFlow function is called outside of the loop creating the other tasks. This ensures that the last_task()
task is only created once. The other change is to set the last_task()
call to a variable and use this variable to then declare the task dependency to taskb()
(similar to what you were doing with the success
variable in your original code snippet). With these small changes we get 2 paths with a shared final task as last_task()
: