Search code examples
python-3.xairflowairflow-scheduler

Airflow DAG Scheduling last day of month -n days


I want to schedule my dag to run 3 days before the last day of month, so for Feb my dag should run on 25 whereas for march the dag should run on 28th day. Any ideas on how I could schedule this ?

Thanks


Solution

  • For Airflow >= 2.2.0:

    AIP-39 Richer scheduler_interval is available. You can define your own Timetable for the scheduling. There is How to guide customize DAG scheduling with Timetables. You will need to register Timetable via plugin and define the scheduling logic.

    For Airflow < 2.2.0:

    you can schedule DAGs only if you can "say it" in a single cron expression. If your scheduling wish doesn't fit a cron expression then you can not set it out of the box. You can however find a cron expression that is close enough to what you wish like (0 0 25-31 * * - every day-of-month from 28 through 31 ) and place a ShortCircuitOperator in the beginning of your DAG that will verify if the date is actually 3 days before the end of the month. If the date is matched it will continue to execute downstream task if the date doesn't match it will skip the downstream tasks:

    import calendar
    from datetime import datetime, date, timedelta
    
    from airflow.models import DAG
    from airflow.operators.dummy import DummyOperator
    from airflow.operators.python import ShortCircuitOperator
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2021, 8, 21)
    
    }
    
    
    def check_if_last_day_of_month(execution_date):
        #  calendar.monthrange return a tuple (weekday of first day of the
        #  month, number
        #  of days in month)
        run_date = datetime.fromtimestamp(execution_date.timestamp())
        last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
        # check if date is 3 days behind the last day of the month
        if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
            return True
        return False
    
    
    with DAG(
        dag_id='short_example',
        schedule_interval="@once",
        default_args=default_args,
    ) as dag:
        first = ShortCircuitOperator(
            task_id='verify_date',
            python_callable=check_if_last_day_of_month
        )
    
        second = DummyOperator(task_id='task')
    
        first >> second
    

    Example run for the 2021-01-30:

    enter image description here

    Example run for the 2021-01-28:

    enter image description here

    Note: Make sure you are comparing the date that interests you. In the example I compared the execution_date of the DAG.