Search code examples
airflowairflow-schedulergoogle-cloud-composer

Airflow schedule interval stuck at '1 day, 0:00:0'


I need to change the time my DAG runs at to be midday. I followed the advice from various questions on here and deleted the DAG and uploaded a renamed version with a new dag_id. Even though the original DAG was renamed from, say, dag_1.py to new_dag_1.py, THE dag_id changed, and the schedule interval was changed from '@daily' to '0 12 * * *' when I upload this new DAG version the schedule part of the homepage for this dag still says '1 day, 0:00:00'.

I uploaded a completely different DAG with these parameters and the schedule interval correctly shows '0 12 * * *' so there must be something in the changed DAG or some metadata that is preventing Airflow from seeing it as new. Other than the name of the dag file that I changed and the changed dag_id is there anything else I should have changed for Airflow to let me set this to run at a different time?

EDIT: I have recreated this problem with some generic code:

Here is the first version of the dag: dag_1.py

import airflow
import datetime
import logging
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'my_name',
    'depends_on_past': False,
    'email': ['my_email'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0 ,
    'slack_conn_id': 'slack_conn',
    'start_date': YESTERDAY,
    'project_id': 'my_project',
    'dataset_id': 'my_dataset',
    'schedule_interval': '@daily',
}

with airflow.DAG(
        'test_cron',
        catchup=False,
        default_args=default_args,
        tags=['example']
    ) as dag:

    my_empty_operator = DummyOperator(
        task_id='empty_task',
        dag=dag,
    )

    my_empty_operator

And the updated version dag_1_2.py

import airflow
import datetime
import logging
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'my_name',
    'depends_on_past': False,
    'email': ['my_email'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0 ,
    'slack_conn_id': 'slack_conn',
    'start_date': YESTERDAY,
    'project_id': 'my_project',
    'dataset_id': 'my_dataset',
    'schedule_interval': '0 12 * * *',
}

with airflow.DAG(
        'test_cron_2',
        catchup=False,
        default_args=default_args,
        tags=['example']
    ) as dag:

    my_empty_operator = DummyOperator(
        task_id='empty_task',
        dag=dag,
    )

    my_empty_operator

Note I have changed 1.the name of the file 2.the name of the dag_id 3.the schedule_interval

However the new DAG, dag_1_2.py still shows on the UI as having a schedule of 1 day, 0:00:00. How is this possible? For all intents and purposes this is a new DAG (or so it seems?)


Solution

  • The scheduling is set to '1 day, 0:00:00' (by default) and is not updated because you are defining schedule_interval in default_args. You should define schedule_interval in the DAG object directly. See Airflow scheduling docs.

    The default schedule_interval is one day (datetime.timedelta(1)). You must specify a different schedule_interval directly to the DAG object you instantiate, not as a default_param, as task instances do not override their parent DAG’s schedule_interval.

    Your DAG should be updated in this manner:

    default_args = {
        'owner': 'my_name',
        'depends_on_past': False,
        'email': ['my_email'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0 ,
        'slack_conn_id': 'slack_conn',
        'start_date': YESTERDAY,
        'project_id': 'my_project',
        'dataset_id': 'my_dataset',
    }
    
    with airflow.DAG(
            'test_cron_2',
            catchup=False,
            default_args=default_args,
            schedule_interval='0 12 * * *', #define schedule_interval in the DAG object
            tags=['example']
        ) as dag: