Search code examples
pythonkubernetesairflowkubernetes-helmairflow-webserver

Airflow: Outdated DAGs persisting in Web UI after gitSync


I am using Airflow on Kubernetes using git-sync to sync DAGs from a git repository. I can successfully import DAGs, but I'm seeing an issue where old changes are persisting in the Airflow UI alongside new changes.

Example:

In my remote repository I have the following file: dags/dag_test.py The contents of this are as follows:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner'          :'airflow',
    'depends_on_past': False,
    'start_date'     : datetime(2023, 1, 1),
    'retries'        : 0
}

dag = DAG(dag_id='my_dag3', default_args=default_args, catchup=False, schedule_interval='@once')

start = DummyOperator(task_id='start', dag=dag)
end   = DummyOperator(task_id='end', dag=dag)

start >> end

The DAG my_dag3 is picked up by Airflow and visible in the UI. The DAG can run successfully, so far so good.

Now, I push a change to my master branch to rename the dag_id to become dag_id='my_dag4', no other changes to the file or the filename - this should effectively define a new DAG dag4 and remove dag3.

When this change reaches the UI, dag3 continues to stay and can continue to be run for a short period of time. At the same time, you can see the replacement dag4 alongside. Both DAGs can still be run.

enter image description here

Furthermore, when I look at the code of dag3 it now contains the code after the change, i.e. the code does not match the definition of the DAG:

enter image description here

Both DAGs continue to be visible for a short while, until eventually dag3 will disappear. Even though this problem occurs for a short period of time, it could lead to some nasty bugs from running outdated code. Why does this happen and how can I ensure Airflow UI only displays the DAGs based on the present snapshot of the repository?

My deployment:
Airflow Chart: 8.6.1 https://github.com/airflow-helm/charts.
Airflow Image: 2.4.3-python3.8.
Running locally in minikube


Solution

  • Now, I push a change to my master branch to rename the dag_id to become dag_id='my_dag4', no other changes to the file or the filename - this should effectively define a new DAG dag4 and remove dag3.

    Not true. Renaming existed dag_id will register the new dag but it will not delete the previous dag records. These records are still present in the database. If you want to remove them you need to click on the delete button on the previous dag_id to purge all associated records (dag runs, task instances etc...) Keep in mind that one might want to preserve previous dag records for history/audit/other reason.

    Note that the way Airflow works is with seralized dags. This means that your code is synced into the DB and all Airflow services (including the webserver) see the database table. So from webserver perspective it see two records in the dag table. It does not know what changes you did to the python file.

    In Airflow 2.5.0 parsing_cleanup_interval configuration was added to automatically deactivated stale DAGs.

    How often (in seconds) to check for stale DAGs (DAGs which are no longer present in the expected files) which should be deactivated, as well as datasets that are no longer referenced and should be marked as orphaned.