I am to generate a set of tasks, dynamically using DAG Params. In details, I would like to "pass" a list of dates (string) to my DAG, and pass each date (string) to a sub-dag, that I trigger in a sequential fashion. In following, I use DummyOperator()
instead.
in my DAG, I have:
...
with DAG(
...,
params={"date_list":Param(["2022-10-10"], type="array", ...}
),
as dag:
t0 = None
for idx, _date in enumerate("{{params.date_list}}"):
t1 = DummyOperator(task_id="f{idx}-{_date}")
if t0 is not None:
t0 >> t1
t0 = t1
For instance, if I input, date_list=["2022-01-10", "2023-06-22"]
, i expect to see two tasks generated, like following:
"0-2022-01-10" >> "1-2023-06-22"
Unfortunately, this code does not work since {{params.date_list}}
does not get parsed, and passed as "{{params.date_list}}" instead.
What is the best way to achieve above, i.e., to dynamically generate tasks based on the input param? Any solution that help me to generate that list, and hence, use in the for loop
is fine.
I appreciate if someone shed some lights for me.
UPDATE: following @ozs comment, I made the following changes:
def make_list():
context = get_current_context()
return context["params"]["date_list"]
@task
def generate_tasks(arg):
return TriggerDagRunOperator(task_id=f"{arg}", trigger_dag_id="test_action", wait_for_completion=True)
generate_tasks = generate_tasks.expand(arg=make_list())
(generate_tasks)
and in other DAG, test_action.py
i have only a simple PythonOperatopr()
.
However, no secondary DAG is being triggered. Am i missing something?
You should use airflow dynamic task to create task according to list of params.
your code is running while airflow is parsing the dag and not running an instance of it.
to to that, you should use airflow DynamicTask
at the example bellow I am triggering dags according the number of items I get in the date_list and pass each dag the value of item.
in this case I ran it with 2 dates so 2 dag_runs were triggered and each one I passed to date param(in test_action dag conf) the value of the item in the list.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="test_dag",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
params={
"date_list": Param(["2022-10-10", "2023-10-10"], type="array",)
},
render_template_as_native_obj=True,
tags=["test"],
) as dag:
@task
def make_list():
context = get_current_context()
return [{"date": val} for val in context["dag_run"].conf["date_list"]]
test_actions = TriggerDagRunOperator.partial(
task_id="test_action", trigger_dag_id="test_action", wait_for_completion=True
).expand(conf=make_list())
(test_actions)
and the test_action dag :
with DAG(
dag_id="test_action",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
params={
"date": ""
},
render_template_as_native_obj=True,
tags=["test"],
) as dag:
@task
def print_arg():
context = get_current_context()
return context["dag_run"].conf["date"]
(print_arg)