In Airflow I know you can automatically send slack notification with on on_success_callback and on_failure_callback which in my case have worked properly.
In my use case, I have an ETL that will raise AirflowSkipException if the current day data is empty which worked properly. But this will send a success notification to my slack
I was wondering if there is something like on_skip_callback or a way to send notification that my DAG are skipped for the current day to my slack.
Any helps would be appreciated.Thanks
Edit : Added Code References for my ETL. Datapoints are acquired from database and it can change from day to day, sometimes if there are no data to be processed then the datapoints will be empty and vice versa.
def ETL_function():
# Retrieve data code
....
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
raise AirflowSkipException
# return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
# return True
# ETL Process code
....
ETL_function_Task = PythonOperator(
task_id='ETL_function',
provide_context=True,
python_callable=fleet_behavior_idling,
on_success_callback=task_success_slack_alert,
dag=dag,
)
Hi @Anindhito Irmandharu,
You can use the ShortCircuitOperator
which is derived from the PythonOperator for this purpose.
def ETL_function():
...
# Validation to check if ETL data is empty
if not datapoints:
print("OUTPUT LOG : ETL Data not found/empty")
print("OUTPUT LOG : ETL skipped due to empty data, Skipping ETL ...... ")
return False
else :
print("OUTPUT LOG : ETL Data found")
print("OUTPUT LOG : ETL continued due data available , Running ETL ...... ")
return True
ETL_function_Task = ShortCircuitOperator(
task_id="ETL_function",
python_callable= ETL_function,
provide_context=True,
dag=dag,
)
ETL_function_Task >> downstream_Tasks
NOTE: Your downstream task will get skipped, but this task 'ETL_function_Task' will go to success state. I'm not exactly sure why you need to send slack notification for tasks executing successfully. Though you can easily alter the
on_success_callback=task_success_slack_alert
based on your requirement. Write a new task_skipped_slack_alert
in the slack_hook you are using.