Search code examples
airflowairflow-taskflow

Airflow taskflow - run task in parallele


Wanted to try the new taskflow API I came to the point where I need to have 2 parallels task.

With Airflow v1 I was use to do something like

task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4

The way we call the task is different now for PythonOperator

How can I do the list with TaskFlow ?

Thanks


Solution

  • if each task is depended on the value from previous task you can achieve it by:

    from airflow.utils.dates import days_ago
    from airflow.decorators import task, dag
    
    
    @task
    def task_1():
        return 'first task'
    
    @task
    def task_2(value):
        return 'second task'
    
    @task
    def task_3(value):
        return 'third task'
    
    @task
    def task_4(value1, value2):
        return 'forth task'
    
    default_args = {
        'owner': 'airflow',
        'start_date': days_ago(2),
    }
    
    
    @dag(dag_id='taskflow_stackoverflow', schedule_interval='@once', default_args=default_args, catchup=False)
    def my_dag():
        op_1 = task_1()
        op_2 = task_2(op_1)
        op_3 = task_3(op_1)
        op_4 = task_4(op_2, op_3)
    
    dag = my_dag()
    

    enter image description here

    The syntax that you mentioned is also supported but you won't get direct access to the xcom values from previous tasks:

    @task
    def task_1():
        return 'first task'
    
    @task
    def task_2():
        return 'second task'
    
    @task
    def task_3():
        return 'third task'
    
    @task
    def task_4():
        return 'forth task'
    
    default_args = {
        'owner': 'airflow',
        'start_date': days_ago(2),
    }
    
    
    @dag(dag_id='taskflow_stackoverflow', schedule_interval='@once', default_args=default_args, catchup=False)
    def my_dag():
    
        op_1 = task_1()
        op_2 = task_2()
        op_3 = task_3()
        op_4 = task_4()
    
        op_1 >> [op_2, op_3]
        [op_2, op_3] >> op_4
    
    dag = my_dag()
    

    Probably you need to mix the two options of syntax depending on what you want to achieve.