Search code examples
airflow-schedulerairflow

How to set a number as retry condition in airflow DAG?


In my Airflow DAG i have 4 tasks

task_1 >> [task_2,task_3]>> task_4

task_4 runs only after a successful run of both task_2 and task_3

How do i set a condition such as :

if task_2 fails, retry task_2 after 2 minutes and stop retrying after the 5th attempt

This is my code :

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator

args={
    'owner' : 'Anti',
    'start_date':days_ago(1)# 1 means yesterday
}

dag = DAG(dag_id='my_sample_dag',default_args=args,schedule_interval='15 * * * *')

def func1(**context):
    print("ran task 1")

def func2(**context):
    print("ran task 2")

def func3(**context):
    print("ran task 3")

def func4(**context):
    print("ran task 4")

with dag:
    task_1=PythonOperator(
        task_id='task1',
        python_callable=func1,
        provide_context=True,
        
    )
    task_2=PythonOperator(
        task_id='task2',
        python_callable=func2,
        provide_context=True 
    )
    task_3=PythonOperator(
        task_id='task3',
        python_callable=func3,
        provide_context=True 
    )
    task_4=PythonOperator(
        task_id='task4',
        python_callable=func4,
        provide_context=True 
    )

task_1 >> [task_2,task_3]>> task_4 # t2,t3 runs parallel right after t1 has ran


Solution

  • Every operator supports retry_delay and retries - Airflow documention.

    retries (int) – the number of retries that should be performed before failing the task

    retry_delay (datetime.timedelta) – delay between retries

    If you want to apply this for all of your tasks, you can just edit your args dictionary:

    args={
        'owner' : 'Anti',
        'retries': 5,
        'retry_delay': timedelta(minutes=2),
        'start_date':days_ago(1)# 1 means yesterday
    }
    

    If you just want to apply it to task_2 you can pass it directly to PythonOperator - in that case the other tasks use the default settings.

    One comment on your args, it's not recommended to set a dynamic relative start_date but rather a fixed absolute date.