Search code examples
airflowtimeout

How to force failure of DAG if it takes too long (dagrun_timeout)?


Having issues trying to figure out how to use dagrun_timeout parameter.

Use case: I'm trying to put a time limit on how long a DAG run can take. I want DAG to fail if it reaches time limit regardless of what state a task is in.

It seems like based on this answer that a timeout failure wouldn't come while task is running. The problem is that even if timeout check is only made after a task is completed, I'm still not seeing expected timeout failure.

In DAG example below, the DAG should have timeout after t1 based on the 'dagrun_timeout': timedelta(seconds=10). The DAG, however, continues to run and finishes all tasks successfully. Also in this example I've tried to make sure that the time it would take for all tasks would take longer than the next scheduled run, but it's still not timing out.

Currently running Airflow 2.0.2

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 1),
    'retries': 0,
    'execution_timeout':timedelta(seconds=65),
    'dagrun_timeout': timedelta(seconds=10),
    'retry_delay': timedelta(seconds=5)
}

dag = DAG(
    'min_timeout',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1),
    max_active_runs=1
)

t1 = BashOperator(
    task_id='fast_task',
    bash_command='sleep 60',
    dag=dag)

t2 = BashOperator(
    task_id='slow_task',
    bash_command='sleep 5',
    dag=dag)

t3 = BashOperator(
    task_id='fast_task_2',
    bash_command='sleep 5',
    dag=dag)

t1 >> t2 >> t3

Solution

  • dagrun_timeout (bad name) is more closely a "cache eviction" sort of time out, and the only time the dag run timeout comes in to play is when a DAG has reached it's maximum active runs, at which point one of the older ones that has exceeded it's timeout will be "evicted" (i.e. failed).

    so it might be better to set "execution_timeout" per task in this case.