Search code examples
airflowdirected-acyclic-graphs

dynamic dag creation and dependencies


I am trying to implement a similar solution from this question: dynamic dag creation based on dependencies from table

However, I am running in a couple issues that I cannot seem to solve. Here is my code:

@dag(start_date=datetime(2024, 2, 3), schedule_interval='@daily', catchup=True, default_args=default_args,
     max_active_runs=1)

def test_job():
    work_flow_task_group = DatabricksWorkflowTaskGroup(
        ...
    )

    with work_flow_task_group:

        for task_id, details in task_list.items():
            templates.create_misc_notebook(task_id,
                                           details['notebookpath'])

        for task_id, details in task_list.items():
            if task_up := details.get("input_table"):
                work_flow_task_group.dag.get_task("task2")


test_job()

This part works just fine and generates a dag with 5 tasks with task_ids: task1, task2, task3, task4, task5

for task_id, details in task_list.items():
            templates.create_misc_notebook(task_id,
                                           details['notebookpath'])

However, when I introduce this bit of code:

for task_id, details in task_list.items():
            if task_up := details.get("input_table"):
                work_flow_task_group.dag.get_task("task2")

And try to find task2, I get the following error:

Broken DAG: [/usr/local/airflow/dags/date.py] Traceback (most recent call last):
  File "/usr/local/airflow/dags/date.py", line 45, in test_job
    work_flow_task_group.dag.get_task("task2")
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 2591, in get_task
    raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task task2 not found

I am not entirely sure why this is happening. I am setting the task_ids explicitly for my tests so I am a bit unsure as to why it is unable to find the task.


Solution

  • Not entirely sure on the Databricks part of the code, but in Airflow, when tasks are a part of a TaskGroup, their ID is .. You can also set a parameter in the TaskGroup to disable this feature. Look in this link for prefix_group_id=False