Search code examples
airflowairflow-2.xairflow-taskflow

Naming Airflow dags other then the python callable when using taskflow api


I trying to create multiple dags using the taskflow API that have a variable passed into them which can be used by tasks within the dag

For example I am trying to have this code

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval=None, start_date=datetime(2021, 1, 1))
def dag_template(input_var):

    @task
    def printer_task(x):
        print(x)

    output_input_var = printer_task(input_var)

dag_1 = dag_template("string1")
dag_2 = dag_template(6)

Which ideally would create two dags with the IDs of dag_1 and dag_2. One dag would print the string "string1" the other 6. This almost works with the code creating 1 dag with an ID of dag_template printing 6.

The documentation has that the dag will be called the python callable, is it possible to override this.


Solution

  • I don't feel its a very elegant solution, but it does do what I'm after.

    from airflow.decorators import dag, task
    from datetime import datetime
    
    config = [("dag_1", "string1"), ("dag_2", 6)]
    
    for dag_name, dag_input in config:
    
        @dag(dag_id = dag_name ,schedule_interval=None, start_date=datetime(2021, 1, 1))
        def dag_template(input_var):
            @task
            def printer_task(x):
                print(x)
    
            output_input_var = printer_task(input_var)
    
        globals()[dag_name] = dag_template(dag_input)