Search code examples
pythonairflowdirected-acyclic-graphsairflow-2.x

How to use imported function from a module that creates and returns a DAG for UI to see?


I've created a module named dag_template_module.py that returns a DAG using specified arguments. I want to use this definition for multiple DAGs, doing same thing but from different sources (thus parameters). A simplified version of dag_template_module.py:

from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator

def dag_template(
    dag_id: str,
    echo_message_1: str,
    echo_message_2: str
):
    @dag(
    dag_id=dag_id,
    schedule_interval="0 6 2 * *"
    )
    def dag_example():

        echo_1 = BashOperator(
            task_id='echo_1',
            bash_command=f'echo {echo_message_1}'
        )

        echo_2 = BashOperator(
            task_id='echo_2',
            bash_command=f'echo {echo_message_2}'
        )

        echo_1 >> echo_2

    dag = dag_example()

    return dag

Now I've created a hello_world_dag.py that imports dag_template() function from dag_template_module.py and uses it to create a DAG:

from dag_template import dag_template

hello_world_dag = dag_template(
    dag_id='hello_world_dag',
    echo_message_1='Hello',
    echo_message_2='World'
)

I've expected that this DAG will be discovered by Airflow UI but that's not the case.

I've also tried using globals() in hello_world_dag.py according to documentation but that also doesn't work for me:

from dag_template import dag_template

hello_world_dag = 'hello_word_dag'
globals()[hello_world_dag] = dag_template(dag_id='hello_world_dag',
                                          echo_message_1='Hello',
                                          echo_message_2='World'
)

Solution

  • A couple things:

    • The DAG you are attempting to create is missing the start_date param
    • There is a nuance to how Airflow determine which Python files might contain a DAG definition and it's looking for "dag" and "airflow" in the file contents. The hello_world_dag.py is missing these keywords so the DagFileProcessor won't attempt to parse this file and, therefore, doesn't call the dag_template() function.

    Adding these small tweaks, and running with Airflow 2.5.0:

    dag_template_module.py

    from pendulum import datetime
    
    from airflow.decorators import dag
    from airflow.operators.bash import BashOperator
    
    
    def dag_template(dag_id: str, echo_message_1: str, echo_message_2: str):
        @dag(dag_id, start_date=datetime(2023, 1, 22), schedule=None)
        def dag_example():
    
            echo_1 = BashOperator(task_id="echo_1", bash_command=f"echo {echo_message_1}")
    
            echo_2 = BashOperator(task_id="echo_2", bash_command=f"echo {echo_message_2}")
    
            echo_1 >> echo_2
    
        return dag_example()
    

    hello_world_dag.py

    #airflow dag <- Make sure this these words appear _somewhere_ in the file.
    
    from dag_template_module import dag_template
    
    dag_template(dag_id="dag_example", echo_message_1="Hello", echo_message_2="World")
    
    

    enter image description here