Search code examples
pythonairflowdirected-acyclic-graphs

error: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again


while we are trying to pass dag conf variables as parameters and running in a loop with decorator we are getting below error. is there a way to fix

error in **taskinfo >> completed **

with DAG(
  dag_id="test_dag",
  start_date=datetime(2022, 1, 24),
  schedule_interval=None,
  render_template_as_native_obj=True,
  default_args={},
  params={
    "param2": "[email protected]",
    "sourcedir": ['/home/arya/'],
    "timenum": 0
  },
  catchup=False
) as dag:

    @task
    def make_list(lst):
        context = get_current_context()
        srcdir = []
        for number in range(0, len(lst)):
            srcdir.append(("abc" + lst[number] + "xyz"))
            taskinfo = EmptyOperator(task_id=f"taskinfo_{number}")
            taskinfo.execute(context)

            completed = f"completed{number+1}"
            completed = DummyOperator(task_id=completed,trigger_rule='all_success')

            taskinfo >> completed

    make_list("{{ params.sourcedir }}")

Error: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again**


Solution

  • The task decorator allows users to convert a Python function into a task instance from PythonOperator. So first problem is using @task for your tasks generator method.

    Second problem is that this method doesn't know anything about your dag, so you create 2 * len(lst) tasks which do not belong to any dag, and when you try to create the relationship between the tasks taskinfo and completed, you get this error.

    Here is a simple solution:

    from airflow.models.param import Param
    
    with DAG(
            dag_id="test_dag",
            start_date=datetime(2022, 1, 24),
            schedule_interval=None,
            render_template_as_native_obj=True,
            default_args={},
            params={
                "param2": "[email protected]",
                "sourcedir": ['/home/arya/'],
                "timenum": 0,
                "number_param": Param(default=1, type="number"),
                "str_param": Param(default="1", type="string"),
            },
            catchup=False
    ) as dag:
    
        def make_list(lst, dag_instance):
            context = get_current_context()
            srcdir = []
            for number in range(0, len(lst)):
                srcdir.append(("abc" + lst[number] + "xyz"))
                taskinfo = EmptyOperator(task_id=f"taskinfo_{number}", dag=dag_instance)
                taskinfo.execute(context)
    
                completed = f"completed{number+1}"
                completed = DummyOperator(task_id=completed,trigger_rule='all_success', dag=dag_instance)
    
                taskinfo >> completed
    
        make_list("{{ params.sourcedir }}", dag_instance=dag)