Is it possible to create a Airflow DAG programmatically, by using just REST API?
We have a collection of models, each model consists of:
The scripts are run through a Python job.py file that takes a script file name as parameter.
Our models are updated by many individuals so we need to update our DAG daily. What we have done is created a scheduled Python script that reads all the JSON files and for each model creates in memory DAG that executes each model and its SQL scripts as per the defined dependencies in the JSON config files. What we want to do is to be able to recreate that DAG visually within Airflow DAG programmatically and then execute it, rerun failures etc.
I did some research and per my understanding Airflow DAGs can only be created by using decorators on top of Python files. Is there another approach I missed using REST API?
Here is an example of a JSON we have:
{
"scripts" :[
"Script 1": {
"script_task" : "job.py",
"script_params" : {
"param": "script 1.sql"
},
"dependencies": [
"Script 2",
"Script 3"
]
},
"Script 2": {
"script_task" : "job.py",
"script_params" : {
"param": "script 2.sql"
},
"dependencies": [
"Script 3"
]
},
"Script 3": {
"script_task" : "job.py",
"script_params" : {
"param": "script 3.sql"
},
"dependencies": [
]
}
]
}
Airflow dags are python objects, so you can create a dags factory and use any external data source (json/yaml file, a database, NFS volume, ...) as source for your dags.
Here are the steps to achieve your goal:
create_dag(config_dict)
)# this step is very important to persist the created dag and add it to the dag bag
globals()[<dag id>] = create_dag(dag_config)
So without passing in the details of your java file, if you have already a script which creates the dags in memory, try to apply those steps, and you will find the created dags in the metadata and the UI.
Here are some tips:
create_dag
method.scripts
can be an arraycreate_dag
, I will try to simplify the code (according to what I understood from your json file):from datetime import datetime
from json import loads
from airflow import DAG
from airflow.operators.bash import BashOperator
def create_dag(dag_id, dag_conf) -> DAG:
scripts = dag_conf["scripts"]
tasks_dict = {}
dag = DAG(dag_id=dag_id, start_date=datetime(2022, 1, 1), schedule_interval=None) # configure your dag
for script_name, script_conf in scripts.items():
task = BashOperator(
bash_command=f"python {script_conf['script_task']} {(f'{k}={v}' for k, v in script_conf['script_params'])}",
dag=dag
)
tasks_dict[script_name] = {
"task": task,
"dependencies": script_conf["dependencies"]
}
for task_conf in tasks_dict.values():
for dependency in task_conf["dependencies"]:
task_conf["task"] << tasks_dict[dependency]["task"] # if you mean the inverse, you can replace << by >>
return dag
if __name__ == '__main__':
# create a loop if you have multiple file
# you can load the files from git or S3, I use local storage for testing
dag_conf_file = open("dag.json", "r")
dag_conf_dict = loads(dag_conf_file.read())
dag_id = "test_dag" # read it from the file
globals()[dag_id] = create_dag(dag_id, dag_conf_dict)
P.S: if you will create a big number of dags in the same script (one script to process multiple json file), you may have some performance issues because Airflow scheduler and workers will re-run the script for each task operation, so you will need to improve it using magic loop or the new syntax added in 2.4