Search code examples
pytestairflowdirected-acyclic-graphs

How can we validated whether there are no cycles in the DAG objects


I am writing a unit test for my ETLs and as a process, I want to test all Dags to make sure that they do not have cycles. After reading Data Pipelines with Apache Airflow by Bas Harenslak and Julian de Ruiter I see they are using DAG.test_cycle(), the DAG here is imported from the module airflow.models.dag but when I run the code I get an error that AttributeError: 'DAG' object has no attribute 'test_cycle'

Here is my code snippet

import glob
import importlib
import os

import pytest
from airflow.models.dag import DAG

DAG_PATH = os.path.join(os.path.dirname(file), “…”, “…”, “dags/**/*.py”)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)

@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
    module_name, _ = os.path.splitext(dag_file)
    module_path = os.path.join(DAG_PATH, dag_file)
    mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
    module = importlib.util.module_from_spec(mod_spec)
    mod_spec.loader.exec_module(module)

    dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]

    assert dag_objects

    for dag in dag_objects:
        dag.test_cycle()

Solution

  • Update on the above answers:

    Due to the conflicts caused by having a function with "test" in the name, Airflow team renamed the function to check_cycle

    from airflow.utils.dag_cycle_tester import check_cycle

    To run the actual test, change the final two line to read check_cycle(dag)