I am new to airflow and trying to understand when tasks will be run. I do not understand why task_3a is running immediately when running this example.
How do I make this sample dag run in this order:
import logging
from airflow.decorators import task, dag, task_group
from airflow.utils.dates import days_ago
@dag(
dag_id='taskflow_conditional_dag',
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
)
def my_dag():
logger = logging.getLogger("airflow.task")
@task
def task_1():
logger.info("Task 1 running")
return "run_task_"
@task.branch
def branching_task(data):
if data == "run_task_2":
return "task_2"
return "task_3"
@task
def task_2():
# Task 2 logic here
logger.info("Task 2 running")
pass
@task(
trigger_rule="none_failed"
)
def task_3():
logger.info("Task 3 running")
pass
@task(
trigger_rule="none_failed"
)
def task_4():
logger.info("Task 4 running")
pass
@task_group()
def task_after_3_before_4_group():
@task
def task_3a():
logger.info("Task 3a running.")
pass
@task
def task_3b():
logger.info("Task 3b running.")
pass
return task_3a() >> task_3b()
@task(
trigger_rule="none_failed"
)
def task_5():
logger.info("Task 5 running")
pass
data = task_1()
decision = branching_task(data)
task_2_result = task_2()
task_3_result = task_3()
task_4_result = task_4()
task_5_results = task_5()
data >> decision
decision >> task_2_result >> task_3_result >> task_after_3_before_4_group() >> task_4_result >> task_5_results
dag = my_dag()
I found the following issue in the airflow github issues section outlining this problem: https://github.com/apache/airflow/issues/40196
Once I updated the task group:
@task_group()
def task_after_3_before_4_group():
@task
def task_3a():
logger.info("Task 3a running.")
pass
@task
def task_3b():
logger.info("Task 3b running.")
pass
return task_3a() >> task_3b()
To removet the return value:
@task_group()
def task_after_3_before_4_group():
@task
def task_3a():
logger.info("Task 3a running.")
pass
@task
def task_3b():
logger.info("Task 3b running.")
pass
task_3a() >> task_3b()
all was well.