Search code examples
airflowdirected-acyclic-graphsairflow-taskflow

How can I create a shared child between two tasks using TaskFlow Api?


My code look like this:

def etl():
    for item in ['FIRST','SECCOND','THIRD']:
        if item == 'a':
            requests = ['Data1','Data3']
        elif item == 'b':
            requests = ['Data1']

        for data_name in requests:
            @task(task_id=f'{item}_{data_name}_task_a')
            def taska():
                a,b = some_func
                vars_dict = {'a': a,
                             'b': b}
                return vars_dict

            @task(task_id=f'{account}_{data_name}_get_liveops_data')
            def taskb(vars_dict):
                some_other_func
                return True

            if data_name=='Data1':
                @task(task_id='last_task')
                def last_task(success):
                    dim_experiments.main()
                    return

            vars_dict = taska()
            success = taskb(vars_dict)
            last_task(success)


myc_dag = etl()

The dag looks like this: enter image description here

When should look like this: enter image description here

The goals is to have last_task dependent of taska and taskb except for that taska and taskb that download Data3 Requests. I am not able to achieve it using TaskFlow API


Solution

  • The parallel dependency is occurring because calling the last_task() TaskFlow function and setting the task dependency to it (implicitly via the TaskFlow API) is done within the same loop which calls the other tasks. Each call of a TaskFlow function will create a new task node. If last_task was pulled outside the loops and only the necessary dependencies were set inside the loops, you would achieve the desired structure.

    Let's take a simplified version of your code as an example.

    from datetime import datetime
    from airflow.decorators import dag, task
    
    
    @dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
    def etl():
        @task(task_id="last_task")
        def last_task(some_input=None):
            ...
    
        for item in ["a", "b"]:
    
            @task
            def taska():
                return {"a": "A", "b": "B"}
    
            @task
            def taskb(input):
                ...
    
            success = taskb(taska())
            last_task(success)
    
    
    myc_dag = etl()
    

    In the DAG above, taska(), taskb(), and last_task() TaskFlow functions are all called and their task dependencies set within the loop. So, we see 2 parallel paths:

    enter image description here

    To have last_task() become a shared downstream task to both paths, we need to pull the call to last_task() (meaning that we only create a task node once) but keep the task dependency between taskb() and last_task() intact. This can be done with a small refactor of the example:

    @dag(dag_id="__example__", start_date=datetime(2021, 11, 1), schedule_interval=None)
    def etl():
        @task(task_id="last_task")
        def last_task(some_input=None):
            ...
    
        last_task = last_task()
    
        for item in ["a", "b"]:
            @task
            def taska():
                return {"a": "A", "b": "B"}
    
            @task
            def taskb(input):
                ...
    
            success = taskb(taska())
            success >> last_task
    
    
    myc_dag = etl()
    

    Notice that the last_task() TaskFlow function is called outside of the loop creating the other tasks. This ensures that the last_task() task is only created once. The other change is to set the last_task() call to a variable and use this variable to then declare the task dependency to taskb() (similar to what you were doing with the success variable in your original code snippet). With these small changes we get 2 paths with a shared final task as last_task():

    enter image description here