I am trying to backup the data in Airflow but it's not giving any error and also I am unable to backup the data because it's getting skipped. The code which I have written is :-
import os
from airflow import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator,
)
from composer_plugins import get_list_to_backup
from datetime import datetime, timedelta, date
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.operators.python import PythonOperator
"""the function validates if the schedule_day parameter is a valid day to execute the task schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday """
def _check_valid_day(**kwargs):
today = datetime.today()
if today.isoweekday()==kwargs["schedule_day"]:
return True
else:
raise AirflowSkipException("does not correspond to the backup day")
today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 5) # 5 = Saturday
start_date = datetime(2022, 5, 2)
dag_id = "data_bq_weekly_backups_dag"
event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]
default_dag_args = {
# Setting start date for next Saturday in order to maintain the scheduler
# in a consistent state
"start_date": start_date,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": timedelta(seconds=10),
"project_id": event_collection_project_id,
"schedule_interval": "0 2 * * *",
}
tables_to_backup_list = get_list_to_backup(tables_to_backup)
with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:
check_valid_day = PythonOperator(
task_id='check_valid_day',
python_callable=_check_valid_day,
op_kwargs={
"schedule_day": schedule_day
},
)
task_dict = dict()
for table_to_backup in tables_to_backup_list:
dataset = table_to_backup.split(".")[0]
table = table_to_backup.split(".")[1]
task_name = f"{dataset}_{table}_table_weekly_backup"
task_dict[task_name] = BigQueryToGCSOperator(
task_id=task_name,
trigger_rule="all_success",
dag=dag,
source_project_dataset_table=table_to_backup,
destination_cloud_storage_uris=[
f"gs://{destination_bucket}/{dataset}/{table}/{today.year}/{today.month}/{today.day}/{table}-*.avro"
],
export_format="AVRO", # OPTIONS: AVRO, CSV, JSON
compression="NONE", # OPTIONS: NONE, DEFLATE, GZIP, SNAPPY
labels=None,
)
check_valid_day >> task_dict[task_name]
When I am executing this DAG, there is no error but it's skipping everything :- Airflow DAG TREE VIEW BACKUP_SCHEDULE_DAY=3 is set in environment variable file. I don't know what's wrong in this and why it's not working
I tried your code I was able to reproduce your issue. See run history below:
NOTE: Prior to running your code, I hardcoded values like your environment variables and tables_to_backup_list
to make it work on my environment.
The main problem is in _check_valid_day()
. When this line is executed if today.isoweekday()==kwargs["schedule_day"]
it is always false because they have a data type mismatch.
print(today.isoweekday()) # <class 'int'>
print(kwargs["schedule_day"]) # <class 'str'>
The fix is to make their data types match. Fix below is to convert kwargs["schedule_day"]
to type int:
def _check_valid_day(**kwargs):
today = datetime.today()
if today.isoweekday() == int(kwargs["schedule_day"]): #convert to int
print("inside the if statement")
return True
else:
raise AirflowSkipException("does not correspond to the backup day")
Graph view:
check_valid_day
Logs: