Search code examples
pythongoogle-cloud-platformairflowairflow-schedulergoogle-cloud-composer

Need to backup the data but it's not working in Airflow


I am trying to backup the data in Airflow but it's not giving any error and also I am unable to backup the data because it's getting skipped. The code which I have written is :-

import os
from airflow import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator,
)
from composer_plugins import get_list_to_backup
from datetime import datetime, timedelta, date
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.operators.python import PythonOperator

"""the function validates if the schedule_day parameter is a valid day to execute the task schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday """
def _check_valid_day(**kwargs):
    today = datetime.today()
    if today.isoweekday()==kwargs["schedule_day"]:
        return True
    else:
     raise AirflowSkipException("does not correspond to the backup day")

today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 5)  # 5 = Saturday
start_date = datetime(2022, 5, 2)
dag_id = "data_bq_weekly_backups_dag"

event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]

default_dag_args = {
# Setting start date for next Saturday in order to maintain the scheduler
# in a consistent state
"start_date": start_date,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": timedelta(seconds=10),
"project_id": event_collection_project_id,
"schedule_interval": "0 2 * * *",
}


tables_to_backup_list = get_list_to_backup(tables_to_backup)

with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:

check_valid_day = PythonOperator(
    task_id='check_valid_day',
    python_callable=_check_valid_day,
    op_kwargs={
            "schedule_day": schedule_day
            },
    )
task_dict = dict()

for table_to_backup in tables_to_backup_list:
    dataset = table_to_backup.split(".")[0]
    table = table_to_backup.split(".")[1]
    task_name = f"{dataset}_{table}_table_weekly_backup"
    task_dict[task_name] = BigQueryToGCSOperator(
        task_id=task_name,
        trigger_rule="all_success",
        dag=dag,
        source_project_dataset_table=table_to_backup,
        destination_cloud_storage_uris=[
            f"gs://{destination_bucket}/{dataset}/{table}/{today.year}/{today.month}/{today.day}/{table}-*.avro"
        ],
        export_format="AVRO",  # OPTIONS: AVRO, CSV, JSON
        compression="NONE",  # OPTIONS: NONE, DEFLATE, GZIP, SNAPPY
        labels=None,
    )
    check_valid_day >>  task_dict[task_name]

When I am executing this DAG, there is no error but it's skipping everything :- Airflow DAG TREE VIEW BACKUP_SCHEDULE_DAY=3 is set in environment variable file. I don't know what's wrong in this and why it's not working


Solution

  • I tried your code I was able to reproduce your issue. See run history below:

    enter image description here

    NOTE: Prior to running your code, I hardcoded values like your environment variables and tables_to_backup_list to make it work on my environment.

    The main problem is in _check_valid_day(). When this line is executed if today.isoweekday()==kwargs["schedule_day"] it is always false because they have a data type mismatch.

    print(today.isoweekday()) # <class 'int'>
    print(kwargs["schedule_day"]) # <class 'str'>
    

    The fix is to make their data types match. Fix below is to convert kwargs["schedule_day"] to type int:

    def _check_valid_day(**kwargs):
        today = datetime.today()
        if today.isoweekday() == int(kwargs["schedule_day"]): #convert to int
            print("inside the if statement")
            return True
        else:
            raise AirflowSkipException("does not correspond to the backup day")
    

    Graph view:

    enter image description here

    check_valid_day Logs:

    enter image description here