There are multiple tasks running inside a DAG according to below code.
import logging
from airflow import DAG
from datetime import datetime, timedelta
from util.email_util import Email
from util.slack_alert_util import task_failure_alert
from airflow.operators.dummy import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator
def dag_failure_notification_alert(context):
# Slack notification
logging.info("Sending DAG Slack notification")
task_failure_alert(context)
# Email notification
subject = 'DAG Failure Alert'
from_email = 'abcd@xyz.com'
to_email = ['abcd@xyz.com']
dag_name = str(context['dag'])[6:-1]
dag_run = str(context['dag_run'])[8:-1]
message_body = """
<html>
<body>
<strong>Airflow DAG Failure Report</strong><br /><br />
Dag Name: {}<br />
Dag run details: {}<br />
Execution date and time: {}<br />
Run ID: {}<br />
Task Instance Key: {}<br />
Exception: {}<br />
</body>
</html>
""".format(dag_name, dag_run, str(context['execution_date']), str(context['run_id']),
str(context['task_instance_key_str']), str(context.get('exception')))
logging.info("Message body created for DAG as: %s", message_body)
email_obj = Email(
{'Subject': subject, 'From': from_email, 'To': to_email, 'body': message_body, 'file': None, 'filename': '',
'body_type': 'html'})
email_obj.send()
def task_failure_notification_alert(context):
# Slack notification
logging.info("Sending Task Slack notification")
task_failure_alert(context)
default_args = {
"owner": "analytics",
"start_date": datetime(2021, 12, 12),
'retries': 0,
'retry_delay': timedelta(),
"schedule_interval": "@daily"
}
dag = DAG('test_alert_notification',
default_args=default_args,
catchup=False,
on_failure_callback=dag_failure_notification_alert
)
start_task = DummyOperator(task_id="start_task", dag=dag, on_failure_callback=task_failure_notification_alert)
end_task = DummyOperator(task_id="end_task", dag=dag, on_failure_callback=task_failure_notification_alert)
create_table_sql_query = '''
CREATE TABLE dummy_table (id INT NOT NULL, name VARCHAR(250) NOT NULL);
'''
for i in range(5):
create_table_task = PostgresOperator(
sql=create_table_sql_query,
task_id=str(i),
postgres_conn_id="postgres_dummy_test",
dag=dag,
on_failure_callback=task_failure_notification_alert
)
start_task >> create_table_task >> end_task
DAG graph according to the above code.
As we can see in the above DAG graph image that if parallel Postgres tasks i.e. 0,1,2,3,4 is failing then on_failure_callback will call the python function(task_failure_notification_alert) with context to send a slack notification.
In the end, it is sending slack and email notifications both in case of DAG failure with context having on_failure_callback with dag_failure_notification_alert function call.
In case of Task failure,
The output seems to be like this:
DAG FAIL ALERT
dag: <DAG: test_alert_notification>,
dag_run: <DagRun test_alert_notification @ 2022-11-29 12:03:13.245324+00:00: manual__2022-11-29T12:03:13.245324+00:00, externally triggered: True>,
execution_date: 2022-11-29T12:03:13.245324+00:00,
run_id: manual__2022-11-29T12:03:13.245324+00:00,
task_instance_key_str: test_alert_notification__4__20221129
exception: The conn_id postgres_dummy_test isn't defined
or
DAG FAIL ALERT
dag: <DAG: test_alert_notification>,
dag_run: <DagRun test_alert_notification @ 2022-11-29 12:03:13.245324+00:00: manual__2022-11-29T12:03:13.245324+00:00, externally triggered: True>,
execution_date: 2022-11-29T12:03:13.245324+00:00,
run_id: manual__2022-11-29T12:03:13.245324+00:00,
task_instance_key_str: test_alert_notification__5__20221129
exception: The conn_id postgres_dummy_test isn't defined
for each different task.
In DAG failure, the context contains an exception as None and only a single task instance key which is the last success ID.
DAG failure Output format:
DAG FAIL ALERT
dag: <DAG: test_alert_notification>,
dag_run: <DagRun test_alert_notification @ 2022-11-30 09:33:02.032456+00:00: manual__2022-11-30T09:33:02.032456+00:00, externally triggered: True>,
execution_date: 2022-11-30T09:33:02.032456+00:00,
run_id: manual__2022-11-30T09:33:02.032456+00:00,
task_instance_key_str: test_alert_notification__start_task__20221130
exception: None
I want to pass task failure information i.e exceptions and task instances todag_failure_notification_alert to send an email with accumulated information of all failure tasks.
As I am new to this Airflow. Kindly suggest the best way to do it. Any other method which suits best for this kind of requirement?
Here is the solution I find for it from the stack overflow answer. We can get the list of failed tasks by using passed context only.
e.g.
ti = context['task_instance']
for t in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED): # type: TaskInstance
logging.info(f'failed dag: {t.dag_id}, task: {t.task_id}, url: {t.log_url}')
Updating the dag_failure_notification_alert
as
def dag_failure_notification_alert(context):
# Slack notification
logging.info("Sending DAG Slack notification")
task_failure_alert(context)
failed_tasks = []
dag_name = None
ti = context['task_instance']
for t in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED): # type: TaskInstance
logging.info(f'failed dag: {t.dag_id}, task: {t.task_id}, url: {t.log_url}')
dag_name = t.dag_id
failed_tasks.append({'id': t.task_id, 'url': t.log_url})
if failed_tasks:
# Email notification
subject = 'DAG Failure Alert'
from_email = 'abcd@xyz.com'
to_email = ['abcd@xyz.com']
task_url_link = ""
for failed_task in failed_tasks:
task_url_link += """<a href="{}">{}</a>, """.format(failed_task['url'], failed_task['id'])
task_url_link = task_url_link[:-2]
message_body = """
<html>
<body>
<strong>Airflow DAG Failure Report</strong><br /><br />
<b>Dag Name:</b> {}<br />
<b>Task Details:</b> [{}]<br />
<br />
Thanks,<br />
</body>
</html>
""".format(dag_name, task_url_link)
logging.info("Message body created for DAG as: %s", message_body)
email_obj = Email(
{'Subject': subject, 'From': from_email, 'To': to_email, 'body': message_body, 'file': None, 'filename': '',
'body_type': 'html'})
email_obj.send()
else:
logging.info("No failure Tasks fetched.")
Hope this will help anyone if someone faces the same issue that's why I post the answer.