I need to change the DAG in order to adjust the scheduler using the Python plugin. The scheduler must be Wednesday and Friday at 0 0 * * 3,5 I don't know how to schedule on 2 days. I have tried to schedule on friday. Here is the code which I have tried to execute on Friday :-
"""
The function validates if the schedule_day parameter is a valid day to execute the task
schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday
"""
def _check_valid_day(**kwargs):
today = datetime.today()
if today.isoweekday()==kwargs["schedule_day"]:
return True
else:
raise AirflowSkipException("does not correspond to the backup day")
today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 4) # 4 = Friday
start_date = datetime(2022, 4, 2)
dag_id = "${ENV_NAME_UNDER}_data_bq_weekly_backups_dag"
event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]
default_dag_args = {
# Setting start date for next Friday in order to maintain the scheduler
# in a consistent state
"start_date": start_date,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": timedelta(seconds=10),
"project_id": event_collection_project_id,
"schedule_interval": "0 0 * * 5",
}
tables_to_backup_list = get_list_to_backup(tables_to_backup)
with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:
check_valid_day = PythonOperator(
task_id='check_valid_day',
python_callable=_check_valid_day,
op_kwargs={
"schedule_day": schedule_day
},
)
task_dict = dict()
for table_to_backup in tables_to_backup_list:
dataset = table_to_backup.split(".")[0]
table = table_to_backup.split(".")[1]
task_name = f"{dataset}_{table}_table_weekly_backup"
task_dict[task_name] = BigQueryToGCSOperator(
task_id=task_name,
trigger_rule="all_success",
dag=dag,
)
check_valid_day >> task_dict[task_name]
I have tried:-"""
SCHEDULE_DAY = [3,5]
def _check_valid_day(**kwargs):
today = datetime.today()
if today.isoweekday() in "SCHEDULE_DAY":
return True
else:
raise AirflowSkipException("does not correspond to the backup
day")
inputFilePattern = f"gs://{geodata_bucket_name}/zips/*.zip"
outputDirectory = f"gs://{geodata_bucket_name}/unzips/"
output_failure_file_path =
f"gs://{geodata_bucket_name}/unzips/failed.csv"
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it
is
# detected in the Cloud Storage bucket.
"owner": "airflow",
"start_date": execution_date,
"depends_on_past": False,
# To email on failure or retry set 'email' arg to your email and
enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's
specified in retry_delay
"retries": 1,
"project_id": project_id,
# "schedule_interval": GEODATA_SCHEDULE,
"schedule_interval": "0 0 * * 3,5",
}