Search code examples
pythonpython-3.xairflow

Airflow Task Group Execution Order


I am new to airflow and trying to understand when tasks will be run. I do not understand why task_3a is running immediately when running this example.

How do I make this sample dag run in this order:

  • Task 1
  • Task 2 if instructed to run
  • Task 3
  • Task 3a and Task 3b (great if these run in parallel)
  • Task 4
  • Task 5
import logging
from airflow.decorators import task, dag, task_group
from airflow.utils.dates import days_ago

@dag(
    dag_id='taskflow_conditional_dag',
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
)
def my_dag():

    logger = logging.getLogger("airflow.task")

    @task
    def task_1():
        logger.info("Task 1 running")
        return "run_task_"

    @task.branch
    def branching_task(data):
        if data == "run_task_2":
            return "task_2"
        return "task_3"

    @task
    def task_2():
        # Task 2 logic here
        logger.info("Task 2 running")
        pass

    @task(
        trigger_rule="none_failed"
    )
    def task_3():
        logger.info("Task 3 running")
        pass

    @task(
        trigger_rule="none_failed"
    )
    def task_4():
        logger.info("Task 4 running")
        pass

    @task_group()
    def task_after_3_before_4_group():

        @task
        def task_3a():
            logger.info("Task 3a running.")
            pass

        @task
        def task_3b():
            logger.info("Task 3b running.")
            pass

        return task_3a() >> task_3b()


    @task(
        trigger_rule="none_failed"
    )
    def task_5():
        logger.info("Task 5 running")
        pass

    data = task_1()
    decision = branching_task(data)

    task_2_result = task_2()
    task_3_result = task_3()
    task_4_result = task_4()
    task_5_results = task_5()

    data >> decision
    decision >> task_2_result >> task_3_result >> task_after_3_before_4_group() >> task_4_result >> task_5_results

dag = my_dag()

Solution

  • I found the following issue in the airflow github issues section outlining this problem: https://github.com/apache/airflow/issues/40196

    Once I updated the task group:

        @task_group()
        def task_after_3_before_4_group():
    
            @task
            def task_3a():
                logger.info("Task 3a running.")
                pass
    
            @task
            def task_3b():
                logger.info("Task 3b running.")
                pass
    
            return task_3a() >> task_3b()
    

    To removet the return value:

    
        @task_group()
        def task_after_3_before_4_group():
    
            @task
            def task_3a():
                logger.info("Task 3a running.")
                pass
    
            @task
            def task_3b():
                logger.info("Task 3b running.")
                pass
    
           task_3a() >> task_3b()
    

    all was well.