Search code examples
pythonairflowairflow-api

Can I create a Airflow DAG dynamically using REST API?


Is it possible to create a Airflow DAG programmatically, by using just REST API?

Background

We have a collection of models, each model consists of:

  • A collection of SQL files that need to be run for the model
  • We also keep a JSON file for each model which defines the dependencies between each SQL file.

The scripts are run through a Python job.py file that takes a script file name as parameter.

Our models are updated by many individuals so we need to update our DAG daily. What we have done is created a scheduled Python script that reads all the JSON files and for each model creates in memory DAG that executes each model and its SQL scripts as per the defined dependencies in the JSON config files. What we want to do is to be able to recreate that DAG visually within Airflow DAG programmatically and then execute it, rerun failures etc.

I did some research and per my understanding Airflow DAGs can only be created by using decorators on top of Python files. Is there another approach I missed using REST API?

Here is an example of a JSON we have:

{
    "scripts" :[
        "Script 1": {
            "script_task" : "job.py",
            "script_params" : {
                "param": "script 1.sql"
            },
            
            "dependencies": [
                "Script 2",
                "Script 3"
            ]
        },
        "Script 2": {
            "script_task" : "job.py",
            "script_params" : {
                "param": "script 2.sql"
            },
            
            "dependencies": [
                "Script 3"
            ]
        },
        "Script 3": {
            "script_task" : "job.py",
            "script_params" : {
                "param": "script 3.sql"
            },
            
            "dependencies": [
                
            ]
        }
    ]
}

Solution

  • Airflow dags are python objects, so you can create a dags factory and use any external data source (json/yaml file, a database, NFS volume, ...) as source for your dags.

    Here are the steps to achieve your goal:

    1. create a python script in your dags folder (assume its name is dags_factory.py)
    2. create a python class or method which return a DAG object (assume it is a method and it is defined as create_dag(config_dict))
    3. in the main, load your file/(any external data source) and loop over dags configs, and for each dag:
    # this step is very important to persist the created dag and add it to the dag bag
    globals()[<dag id>] = create_dag(dag_config)
    

    So without passing in the details of your java file, if you have already a script which creates the dags in memory, try to apply those steps, and you will find the created dags in the metadata and the UI.

    Here are some tips:

    • Airflow runs the dag file processor each X seconds (conf), so no need to use an API, instead, you can upload your files to S3/GCS or a git repository , and load them in the main script before calling the create_dag method.
    • Try to imporve your json schema, for ex, scripts can be an array
    • For the method create_dag, I will try to simplify the code (according to what I understood from your json file):
    from datetime import datetime
    from json import loads
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    
    def create_dag(dag_id, dag_conf) -> DAG:
        scripts = dag_conf["scripts"]
        tasks_dict = {}
        dag = DAG(dag_id=dag_id, start_date=datetime(2022, 1, 1), schedule_interval=None)  # configure your dag
        for script_name, script_conf in scripts.items():
            task = BashOperator(
                bash_command=f"python {script_conf['script_task']} {(f'{k}={v}' for k, v in script_conf['script_params'])}",
                dag=dag
            )
            tasks_dict[script_name] = {
                "task": task,
                "dependencies": script_conf["dependencies"]
            }
        for task_conf in tasks_dict.values():
            for dependency in task_conf["dependencies"]:
                task_conf["task"] << tasks_dict[dependency]["task"]  # if you mean the inverse, you can replace << by >>
        return dag
        
    
    if __name__ == '__main__':
        # create a loop if you have multiple file
        # you can load the files from git or S3, I use local storage for testing
        dag_conf_file = open("dag.json", "r")
        dag_conf_dict = loads(dag_conf_file.read())
        dag_id = "test_dag"  # read it from the file
        globals()[dag_id] = create_dag(dag_id, dag_conf_dict)
    

    P.S: if you will create a big number of dags in the same script (one script to process multiple json file), you may have some performance issues because Airflow scheduler and workers will re-run the script for each task operation, so you will need to improve it using magic loop or the new syntax added in 2.4