Search code examples
dynamicparametersairflowdirected-acyclic-graphs

Load DAG Params in Body of DAG


I am to generate a set of tasks, dynamically using DAG Params. In details, I would like to "pass" a list of dates (string) to my DAG, and pass each date (string) to a sub-dag, that I trigger in a sequential fashion. In following, I use DummyOperator() instead.

in my DAG, I have:

...
with DAG(
       ...,
       params={"date_list":Param(["2022-10-10"], type="array", ...}
       ),
as dag:

    t0 = None
    for idx, _date in enumerate("{{params.date_list}}"):
        t1 = DummyOperator(task_id="f{idx}-{_date}")
       
        if t0 is not None:
            t0 >> t1
        t0 = t1

For instance, if I input, date_list=["2022-01-10", "2023-06-22"], i expect to see two tasks generated, like following:

"0-2022-01-10" >> "1-2023-06-22"

Unfortunately, this code does not work since {{params.date_list}} does not get parsed, and passed as "{{params.date_list}}" instead.

What is the best way to achieve above, i.e., to dynamically generate tasks based on the input param? Any solution that help me to generate that list, and hence, use in the for loop is fine.

I appreciate if someone shed some lights for me.

UPDATE: following @ozs comment, I made the following changes:

    def make_list():
        context = get_current_context()
        return context["params"]["date_list"]

    @task
    def generate_tasks(arg):
        return TriggerDagRunOperator(task_id=f"{arg}", trigger_dag_id="test_action", wait_for_completion=True)

    generate_tasks = generate_tasks.expand(arg=make_list())
    (generate_tasks)

and in other DAG, test_action.py i have only a simple PythonOperatopr().

However, no secondary DAG is being triggered. Am i missing something?


Solution

  • You should use airflow dynamic task to create task according to list of params.

    your code is running while airflow is parsing the dag and not running an instance of it.

    to to that, you should use airflow DynamicTask

    at the example bellow I am triggering dags according the number of items I get in the date_list and pass each dag the value of item.

    in this case I ran it with 2 dates so 2 dag_runs were triggered and each one I passed to date param(in test_action dag conf) the value of the item in the list.

    from datetime import datetime
    
    from airflow import DAG
    from airflow.decorators import task
    from airflow.models import Param
    from airflow.operators.python import get_current_context
    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
    
    with DAG(
            dag_id="test_dag",
            schedule_interval=None,
            start_date=datetime(2022, 1, 1),
            params={
                "date_list": Param(["2022-10-10", "2023-10-10"], type="array",)
            },
            render_template_as_native_obj=True,
            tags=["test"],
    ) as dag:
    
        @task
        def make_list():
            context = get_current_context()
            return [{"date": val} for val in context["dag_run"].conf["date_list"]]
    
    
        test_actions = TriggerDagRunOperator.partial(
            task_id="test_action", trigger_dag_id="test_action", wait_for_completion=True
        ).expand(conf=make_list())
    
        (test_actions)
    

    and the test_action dag :

    with DAG(
            dag_id="test_action",
            schedule_interval=None,
            start_date=datetime(2022, 1, 1),
            params={
                "date": ""
            },
            render_template_as_native_obj=True,
            tags=["test"],
    ) as dag:
    
        @task
        def print_arg():
            context = get_current_context()
            return context["dag_run"].conf["date"]
    
        (print_arg)
    

    enter image description here enter image description here