Search code examples
pythonairflowscheduleairflow-scheduler

Not able to change DAG scheduler to execute on 2 days using Python operator


I need to change the DAG in order to adjust the scheduler using the Python plugin. The scheduler must be Wednesday and Friday at 0 0 * * 3,5 I don't know how to schedule on 2 days. I have tried to schedule on friday. Here is the code which I have tried to execute on Friday :-

"""

The function validates if the schedule_day parameter is a valid day to execute the task
schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday
"""

def _check_valid_day(**kwargs):
    today = datetime.today()
     if today.isoweekday()==kwargs["schedule_day"]:
        return True
     else:
        raise AirflowSkipException("does not correspond to the backup day")
today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 4)  # 4 = Friday
start_date = datetime(2022, 4, 2)
dag_id = "${ENV_NAME_UNDER}_data_bq_weekly_backups_dag"

event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]

default_dag_args = {
    # Setting start date for next Friday in order to maintain the scheduler
    # in a consistent state
    "start_date": start_date,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
    # If a task fails, retry it once after waiting at least what's specified in retry_delay
    "retries": 1,
    "retry_delay": timedelta(seconds=10),
    "project_id": event_collection_project_id,
    "schedule_interval": "0 0 * * 5",
}


tables_to_backup_list = get_list_to_backup(tables_to_backup)

with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:

    check_valid_day = PythonOperator(
        task_id='check_valid_day',
        python_callable=_check_valid_day,
        op_kwargs={
                "schedule_day": schedule_day
                },
        )
    task_dict = dict()

    for table_to_backup in tables_to_backup_list:
        dataset = table_to_backup.split(".")[0]
        table = table_to_backup.split(".")[1]
        task_name = f"{dataset}_{table}_table_weekly_backup"
        task_dict[task_name] = BigQueryToGCSOperator(
            task_id=task_name,
            trigger_rule="all_success",
            dag=dag,
            )
        check_valid_day >>  task_dict[task_name]

Solution

  • I have tried:-"""

        SCHEDULE_DAY =  [3,5]
        def _check_valid_day(**kwargs):
            today = datetime.today()
            if today.isoweekday() in "SCHEDULE_DAY":
                return True
            else:
                raise AirflowSkipException("does not correspond to the backup 
        day")
    
    
    inputFilePattern = f"gs://{geodata_bucket_name}/zips/*.zip"
    outputDirectory = f"gs://{geodata_bucket_name}/unzips/"
    output_failure_file_path = 
    f"gs://{geodata_bucket_name}/unzips/failed.csv"
    
    default_dag_args = {
        # Setting start date as yesterday starts the DAG immediately when it 
    is
        # detected in the Cloud Storage bucket.
        "owner": "airflow",
        "start_date": execution_date,
        "depends_on_past": False,
        # To email on failure or retry set 'email' arg to your email and 
    enable
        # emailing here.
        "email_on_failure": False,
        "email_on_retry": False,
        # If a task fails, retry it once after waiting at least what's 
    specified in retry_delay
        "retries": 1,
        "project_id": project_id,
        # "schedule_interval": GEODATA_SCHEDULE,
        "schedule_interval": "0 0 * * 3,5",
    }