while we are trying to pass dag conf variables as parameters and running in a loop with decorator we are getting below error. is there a way to fix
error in **taskinfo >> completed **
with DAG(
dag_id="test_dag",
start_date=datetime(2022, 1, 24),
schedule_interval=None,
render_template_as_native_obj=True,
default_args={},
params={
"param2": "[email protected]",
"sourcedir": ['/home/arya/'],
"timenum": 0
},
catchup=False
) as dag:
@task
def make_list(lst):
context = get_current_context()
srcdir = []
for number in range(0, len(lst)):
srcdir.append(("abc" + lst[number] + "xyz"))
taskinfo = EmptyOperator(task_id=f"taskinfo_{number}")
taskinfo.execute(context)
completed = f"completed{number+1}"
completed = DummyOperator(task_id=completed,trigger_rule='all_success')
taskinfo >> completed
make_list("{{ params.sourcedir }}")
Error: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again**
The task decorator allows users to convert a Python function into a task instance from PythonOperator
. So first problem is using @task
for your tasks generator method.
Second problem is that this method doesn't know anything about your dag, so you create 2 * len(lst) tasks which do not belong to any dag, and when you try to create the relationship between the tasks taskinfo
and completed
, you get this error.
Here is a simple solution:
from airflow.models.param import Param
with DAG(
dag_id="test_dag",
start_date=datetime(2022, 1, 24),
schedule_interval=None,
render_template_as_native_obj=True,
default_args={},
params={
"param2": "[email protected]",
"sourcedir": ['/home/arya/'],
"timenum": 0,
"number_param": Param(default=1, type="number"),
"str_param": Param(default="1", type="string"),
},
catchup=False
) as dag:
def make_list(lst, dag_instance):
context = get_current_context()
srcdir = []
for number in range(0, len(lst)):
srcdir.append(("abc" + lst[number] + "xyz"))
taskinfo = EmptyOperator(task_id=f"taskinfo_{number}", dag=dag_instance)
taskinfo.execute(context)
completed = f"completed{number+1}"
completed = DummyOperator(task_id=completed,trigger_rule='all_success', dag=dag_instance)
taskinfo >> completed
make_list("{{ params.sourcedir }}", dag_instance=dag)