I have a dag which contains 1 custom task, 1 @task.branch
task decorator and 1 taskgroup
, inside the taskgroup I have multiple tasks that need to be triggered sequentially depending on the outcome of the @task.branch.
PROCESS_BATCH_data_FILE = "batch_upload"
SINGLE_data_FILE_FIRST_OPERATOR = "validate_data_schema_task"
ENSURE_INTEGRITY_TASK = "provide_data_integrity_task"
PROCESS_SINGLE_data_FILE = "process_single_data_file_task"
default_args = {
"retries": 0,
"retry_delay": timedelta(seconds=30),
"trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
flow_name = "data_ingestion"
with DAG(
flow_name,
default_args=default_args,
start_date= airflow.utils.dates.days_ago(0),
schedule=None,
dagrun_timeout=timedelta(minutes=180)
) as dag:
update_status_running_op = UpdateStatusOperator(
task_id="update_status_running_task",
)
@task.branch(task_id = 'check_payload_type')
def is_batch(**context):
# data = context["dag_run"].conf["execution_context"].get("data")
if isinstance(data, dict):
subdag = "validate_data_schema_task"
elif isinstance(data, list):
subdag = PROCESS_BATCH_data_FILE
return subdag
with TaskGroup(group_id='group1') as my_task_group:
validate_schema_operator = ValidatedataSchemaOperator(task_id=SINGLE_data_FILE_FIRST_OPERATOR)
ensure_integrity_op = EnsuredataIntegrityOperator(task_id=ENSURE_INTEGRITY_TASK)
process_single_data_file = ProcessdataOperatorR3(task_id=PROCESS_SINGLE_data_FILE)
validate_schema_operator >> ensure_integrity_op >> process_single_data_file
update_status_finished_op = UpdateStatusOperator(
task_id="update_status_finished_task",
dag=dag,
trigger_rule="all_done",
)
batch_upload = DummyOperator(
task_id=PROCESS_BATCH_data_FILE
)
for batch in range(0, BATCH_NUMBER):
batch_upload >> ProcessdataOperatorR3(
task_id=f"process_data_task_{batch + 1}",
previous_task_id=f"provide_data_integrity_task_{batch + 1}",
batch_number=batch + 1,
trigger_rule="none_failed_or_skipped"
) >> update_status_finished_op
branch_task = is_batch()
update_status_running_op >> branch_task
branch_task >> batch_upload
branch_task >> my_task_group >> update_status_finished_op
When I trigger below tag I get the following error:
airflow.exceptions.AirflowException: 'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'validate_data_schema_task'}.
I dont understand why I get the error because 'validate_data_schema_task'
is defined at the top of the file. I have tried to hard code 'validate_data_schema_task'
as task_id
but that gives me the same error.
When referring to a task nested in a task group you need to specify their task _id as "group_id.task_id".
This should work:
@task.branch(task_id = 'check_payload_type')
def is_batch(**context):
# data = context["dag_run"].conf["execution_context"].get("data")
if isinstance(data, dict):
subdag = "group1.validate_data_schema_task"
elif isinstance(data, list):
subdag = "group1." + PROCESS_BATCH_data_FILE
return subdag