Search code examples
google-cloud-platformairflowairflow-scheduler

How airflow loads/updates DagBag from dags home folder on google cloud platform?


Please do not down vote my answer. If needed then I will update and correct my words. I have done my home-work research. I am little new so trying to understand this.

I would like to understand that how do airflow on Google cloud platform gets the changes from dags home folder to UI. Also Please help me with my dags setup script. I have read so many answers along with books. book link is here

I tried figuring out my answer from page 69 which says

3.11 Scheduling & Triggers The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.

My understanding from this book is that scheduler regularly takes changes from dags home folder. (Is it correct?)

I also read multiple answers on stack overflow , I found this one useful Link

But still answer does not contain process that is doing this creation/updation of dagbag from script.py in dag home folder. How changes are sensed.

Please help me with my dags setup script. We have created a generic python script that dynamically creates dags by reading/iterating over config files.

Below is directory structure

/dags/workflow/
/dags/workflow/config/dag_a.json
/dags/workflow/config/dag_b.json
/dags/workflow/task_a_with_single_operator.py
/dags/workflow/task_b_with_single_operator.py
/dags/dag_creater.py

Execution flow dag_creater.py is as following :-

 1. Iterate in dags/workflow/config folder get the Config JSON file and
    read variable dag_id.
 2. create Parent_dag = DAG(dag_id=dag_id,
    start_date=start_date, schedule_interval=schedule_interval,
                             default_args=default_args, catchup=False) 
 3. Read tasks with dependencies of that dag_id from config json file
    (example :- [[a,[]],[b,[a]],[c,[b]]]) and code it as task_a >>
    task_b >> task_c

This way dag is created. All works fine. Dags are also visible on UI and running fine.

But problem is, My dag creation script is running every time. Even in each task logs I see logs of all the dags. I expect this script to run once. just to fill entry in metadata. I am unable to understand like why is it running every time. Please make me understand the process.

I know airflow initdb is run once we setup metadata first time. So that is not doing this update all time.

  • Is it scheduler heart beat updating all?
  • Is my setup correct?

Please Note: I can't type real code as that is the restriction from my organization. However if asked, i will provide more information.


Solution

  • Airflow Scheduler is actually continuously running in Airflow runtime environment as a main contributor for monitoring changes in DAG folder and triggering the relevant DAG tasks residing in this folder. The main settings for Airflow Scheduler service can be found in airflow.cfg file, essentially the heart beat intervals which effectively impact the general DAG tasks maintenance.

    However, the way how the particular task will be executed is defined as per the Executor's model in Airflow configuration.

    To store DAGs being available for the Airflow runtime environment GCP Composer uses Cloud Storage, implementing the specific folder structure, synchronizing any object arriving to /dags folder with *.py extension be verified if it contains the DAG definition.

    If you expect to run DAG spreading script within Airflow runtime, then in this particular use case I would advise you to look at PythonOperator, using it in the separate DAG to invoke and execute your custom generic Python code with guarantees scheduling it only once a time. You can check out this Stack thread with implementation details.