Need to backup the data but it's not working in Airflow

I am trying to backup the data in Airflow but it's not giving any error and also I am unable to backup the data because it's getting skipped. The code which I have written is :-

import os
from airflow import DAG
from import (
from composer_plugins import get_list_to_backup
from datetime import datetime, timedelta, date
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.operators.python import PythonOperator

"""the function validates if the schedule_day parameter is a valid day to execute the task schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday """
def _check_valid_day(**kwargs):
    today =
    if today.isoweekday()==kwargs["schedule_day"]:
        return True
     raise AirflowSkipException("does not correspond to the backup day")

today =
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 5)  # 5 = Saturday
start_date = datetime(2022, 5, 2)
dag_id = "data_bq_weekly_backups_dag"

event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]

default_dag_args = {
# Setting start date for next Saturday in order to maintain the scheduler
# in a consistent state
"start_date": start_date,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": timedelta(seconds=10),
"project_id": event_collection_project_id,
"schedule_interval": "0 2 * * *",

tables_to_backup_list = get_list_to_backup(tables_to_backup)

with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:

check_valid_day = PythonOperator(
            "schedule_day": schedule_day
task_dict = dict()

for table_to_backup in tables_to_backup_list:
    dataset = table_to_backup.split(".")[0]
    table = table_to_backup.split(".")[1]
    task_name = f"{dataset}_{table}_table_weekly_backup"
    task_dict[task_name] = BigQueryToGCSOperator(
        export_format="AVRO",  # OPTIONS: AVRO, CSV, JSON
        compression="NONE",  # OPTIONS: NONE, DEFLATE, GZIP, SNAPPY
    check_valid_day >>  task_dict[task_name]

When I am executing this DAG, there is no error but it's skipping everything :- Airflow DAG TREE VIEW BACKUP_SCHEDULE_DAY=3 is set in environment variable file. I don't know what's wrong in this and why it's not working


  • I tried your code I was able to reproduce your issue. See run history below:

    NOTE: Prior to running your code, I hardcoded values like your environment variables and tables_to_backup_list to make it work on my environment.

    The main problem is in _check_valid_day(). When this line is executed if today.isoweekday()==kwargs["schedule_day"] it is always false because they have a data type mismatch.

    print(today.isoweekday()) # <class 'int'>
    print(kwargs["schedule_day"]) # <class 'str'>

    The fix is to make their data types match. Fix below is to convert kwargs["schedule_day"] to type int:

    def _check_valid_day(**kwargs):
        today =
        if today.isoweekday() == int(kwargs["schedule_day"]): #convert to int
            print("inside the if statement")
            return True
            raise AirflowSkipException("does not correspond to the backup day")

    Graph view:

    check_valid_day Logs:

