Search code examples

dynamic dag creation and dependencies

I am trying to implement a similar solution from this question: dynamic dag creation based on dependencies from table

However, I am running in a couple issues that I cannot seem to solve. Here is my code:

@dag(start_date=datetime(2024, 2, 3), schedule_interval='@daily', catchup=True, default_args=default_args,

def test_job():
    work_flow_task_group = DatabricksWorkflowTaskGroup(

    with work_flow_task_group:

        for task_id, details in task_list.items():

        for task_id, details in task_list.items():
            if task_up := details.get("input_table"):


This part works just fine and generates a dag with 5 tasks with task_ids: task1, task2, task3, task4, task5

for task_id, details in task_list.items():

However, when I introduce this bit of code:

for task_id, details in task_list.items():
            if task_up := details.get("input_table"):

And try to find task2, I get the following error:

Broken DAG: [/usr/local/airflow/dags/] Traceback (most recent call last):
  File "/usr/local/airflow/dags/", line 45, in test_job
  File "/usr/local/lib/python3.11/site-packages/airflow/models/", line 2591, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task task2 not found

I am not entirely sure why this is happening. I am setting the task_ids explicitly for my tests so I am a bit unsure as to why it is unable to find the task.


  • Not entirely sure on the Databricks part of the code, but in Airflow, when tasks are a part of a TaskGroup, their ID is .. You can also set a parameter in the TaskGroup to disable this feature. Look in this link for prefix_group_id=False