I am writing a unit test for my ETLs and as a process, I want to test all Dags to make sure that they do not have cycles. After reading Data Pipelines with Apache Airflow by Bas Harenslak and Julian de Ruiter I see they are using DAG.test_cycle(), the DAG here is imported from the module airflow.models.dag but when I run the code I get an error that AttributeError: 'DAG' object has no attribute 'test_cycle'
Here is my code snippet
import glob
import importlib
import os
import pytest
from airflow.models.dag import DAG
DAG_PATH = os.path.join(os.path.dirname(file), “…”, “…”, “dags/**/*.py”)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
for dag in dag_objects:
dag.test_cycle()
Update on the above answers:
Due to the conflicts caused by having a function with "test" in the name, Airflow team renamed the function to check_cycle
from airflow.utils.dag_cycle_tester import check_cycle
To run the actual test, change the final two line to read check_cycle(dag)