Search code examples
airflowairflow-scheduler

How to find out the delayed jobs in airflow


Some of my DAG are waiting to get scheduled, and some are waiting in the queue. I suspect there are reasons for this delay but not sure how I can start to debug this problem. Majority of the pipelines are running Spark jobs.

Can someone help to give me some directions in terms of where to look at to 1) anaylse which DAGs were delayed (did not start at the scheduled time) 2) where are the places I should look at to find out if the resources are enough. I'm quite new to scheduling in Airflow. Many thanks. Please let me know if I can describe the question better.


Solution

  • If you are looking for code that takes advantage of Airflows' wider capabilities.

    There are three modules within airflow.models which can be harnessed.

    1. To programmatically retrieve all DAGs which your Airflow is away of, we import DagBag. From the docs "A dagbag is a collection of dags, parsed out of a folder tree and has high"
    2. We utilise DagModel and the method get_current, to initialise each dag_id present in our bag
    3. We check if any DAG is active using the DagModel property is_paused
    4. We retrieve the latest DAG run using the DagRun.find
    5. Sort the individual dag runs by latest to earliest
    6. Here you could just subset [0] to get 1, however, for your debugging purposes I just loop through them all
    7. DagRun returns a lot of information for us to use. In my loop I have output print(i, run.state, run.execution_date, run.start_date). So you can see what is going on under the hood.

    id state dag_id queued_at execution_date start_date end_date run_id data_interval_start data_interval_end last_scheduling_decision

    1. I have commented out an if check for any queued Dags for you to uncomment. Additionally you can do some arithmetic on dates if you desire, to add further conditional functionality.
    from datetime import datetime, timedelta
    
    from airflow import DAG
    from airflow.models import DagBag, DagModel, DagRun
    from airflow.operators.python import PythonOperator
    
    
    # make a function that returns if a DAG is set to active or paused
    
    def check_dag_active():
        bag = DagBag()
        for dag_id in bag.dags:
            in_bag = DagModel.get_current(dag_id)
            if not in_bag.is_paused:
                latest = DagRun.find(dag_id=dag_id)
                latest.sort(key=lambda x: x.execution_date, reverse=True)
                for i, run in enumerate(latest):
                    print(i, run.state, run.execution_date, run.start_date)
                    # if run.state == 'queued':
                    #     return [run.dag_id, run.execution_date, run.start_date]
    
    with DAG(
      'stack_overflow_ans_3',
      tags = ['SO'],
      start_date = datetime(2022, 1, 1),
      schedule_interval = None,
      catchup = False,
      is_paused_upon_creation = False
    ) as dag:
    
      t1 = PythonOperator(
        task_id = 'task_that_will_fail',
        python_callable = check_dag_active
      )