I need to change the time my DAG runs at to be midday. I followed the advice from various questions on here and deleted the DAG and uploaded a renamed version with a new dag_id
. Even though the original DAG was renamed from, say, dag_1.py to new_dag_1.py, THE dag_id
changed, and the schedule interval was changed from '@daily' to '0 12 * * *' when I upload this new DAG version the schedule part of the homepage for this dag still says '1 day, 0:00:00'.
I uploaded a completely different DAG with these parameters and the schedule interval correctly shows '0 12 * * *' so there must be something in the changed DAG or some metadata that is preventing Airflow from seeing it as new. Other than the name of the dag file that I changed and the changed dag_id
is there anything else I should have changed for Airflow to let me set this to run at a different time?
EDIT: I have recreated this problem with some generic code:
Here is the first version of the dag: dag_1.py
import airflow
import datetime
import logging
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'my_name',
'depends_on_past': False,
'email': ['my_email'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0 ,
'slack_conn_id': 'slack_conn',
'start_date': YESTERDAY,
'project_id': 'my_project',
'dataset_id': 'my_dataset',
'schedule_interval': '@daily',
}
with airflow.DAG(
'test_cron',
catchup=False,
default_args=default_args,
tags=['example']
) as dag:
my_empty_operator = DummyOperator(
task_id='empty_task',
dag=dag,
)
my_empty_operator
And the updated version dag_1_2.py
import airflow
import datetime
import logging
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_args = {
'owner': 'my_name',
'depends_on_past': False,
'email': ['my_email'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0 ,
'slack_conn_id': 'slack_conn',
'start_date': YESTERDAY,
'project_id': 'my_project',
'dataset_id': 'my_dataset',
'schedule_interval': '0 12 * * *',
}
with airflow.DAG(
'test_cron_2',
catchup=False,
default_args=default_args,
tags=['example']
) as dag:
my_empty_operator = DummyOperator(
task_id='empty_task',
dag=dag,
)
my_empty_operator
Note I have changed 1.the name of the file 2.the name of the dag_id 3.the schedule_interval
However the new DAG, dag_1_2.py still shows on the UI as having a schedule of 1 day, 0:00:00. How is this possible? For all intents and purposes this is a new DAG (or so it seems?)
The scheduling is set to '1 day, 0:00:00' (by default) and is not updated because you are defining schedule_interval
in default_args
. You should define schedule_interval
in the DAG object directly. See Airflow scheduling docs.
The default schedule_interval is one day (datetime.timedelta(1)). You must specify a different schedule_interval directly to the DAG object you instantiate, not as a default_param, as task instances do not override their parent DAG’s schedule_interval.
Your DAG should be updated in this manner:
default_args = {
'owner': 'my_name',
'depends_on_past': False,
'email': ['my_email'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0 ,
'slack_conn_id': 'slack_conn',
'start_date': YESTERDAY,
'project_id': 'my_project',
'dataset_id': 'my_dataset',
}
with airflow.DAG(
'test_cron_2',
catchup=False,
default_args=default_args,
schedule_interval='0 12 * * *', #define schedule_interval in the DAG object
tags=['example']
) as dag: