I've created a module named dag_template_module.py
that returns a DAG using specified arguments. I want to use this definition for multiple DAGs, doing same thing but from different sources (thus parameters). A simplified version of dag_template_module.py
:
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
def dag_template(
dag_id: str,
echo_message_1: str,
echo_message_2: str
):
@dag(
dag_id=dag_id,
schedule_interval="0 6 2 * *"
)
def dag_example():
echo_1 = BashOperator(
task_id='echo_1',
bash_command=f'echo {echo_message_1}'
)
echo_2 = BashOperator(
task_id='echo_2',
bash_command=f'echo {echo_message_2}'
)
echo_1 >> echo_2
dag = dag_example()
return dag
Now I've created a hello_world_dag.py
that imports dag_template()
function from dag_template_module.py
and uses it to create a DAG:
from dag_template import dag_template
hello_world_dag = dag_template(
dag_id='hello_world_dag',
echo_message_1='Hello',
echo_message_2='World'
)
I've expected that this DAG will be discovered by Airflow UI but that's not the case.
I've also tried using globals()
in hello_world_dag.py
according to documentation but that also doesn't work for me:
from dag_template import dag_template
hello_world_dag = 'hello_word_dag'
globals()[hello_world_dag] = dag_template(dag_id='hello_world_dag',
echo_message_1='Hello',
echo_message_2='World'
)
A couple things:
start_date
paramhello_world_dag.py
is missing these keywords so the DagFileProcessor won't attempt to parse this file and, therefore, doesn't call the dag_template()
function.Adding these small tweaks, and running with Airflow 2.5.0:
dag_template_module.py
from pendulum import datetime
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
def dag_template(dag_id: str, echo_message_1: str, echo_message_2: str):
@dag(dag_id, start_date=datetime(2023, 1, 22), schedule=None)
def dag_example():
echo_1 = BashOperator(task_id="echo_1", bash_command=f"echo {echo_message_1}")
echo_2 = BashOperator(task_id="echo_2", bash_command=f"echo {echo_message_2}")
echo_1 >> echo_2
return dag_example()
hello_world_dag.py
#airflow dag <- Make sure this these words appear _somewhere_ in the file.
from dag_template_module import dag_template
dag_template(dag_id="dag_example", echo_message_1="Hello", echo_message_2="World")