Search code examples
airflowdirected-acyclic-graphs

Parse context from Task to DAG in case of success or failure


There are multiple tasks running inside a DAG according to below code.

import logging
from airflow import DAG
from datetime import datetime, timedelta
from util.email_util import Email
from util.slack_alert_util import task_failure_alert
from airflow.operators.dummy import DummyOperator
from airflow.operators.postgres_operator import PostgresOperator


def dag_failure_notification_alert(context):
    # Slack notification
    logging.info("Sending DAG Slack notification")
    task_failure_alert(context)

    # Email notification
    subject = 'DAG Failure Alert'
    from_email = 'abcd@xyz.com'
    to_email = ['abcd@xyz.com']
    dag_name = str(context['dag'])[6:-1]
    dag_run = str(context['dag_run'])[8:-1]
    message_body = """
                    <html>
                        <body>
                        <strong>Airflow DAG Failure Report</strong><br /><br />
                        Dag Name: {}<br />
                        Dag run details: {}<br />
                        Execution date and time: {}<br />
                        Run ID: {}<br />
                        Task Instance Key: {}<br />
                        Exception: {}<br />
                        </body>
                    </html>
                    """.format(dag_name, dag_run, str(context['execution_date']), str(context['run_id']),
                               str(context['task_instance_key_str']), str(context.get('exception')))
    logging.info("Message body created for DAG as: %s", message_body)
    email_obj = Email(
        {'Subject': subject, 'From': from_email, 'To': to_email, 'body': message_body, 'file': None, 'filename': '',
         'body_type': 'html'})
    email_obj.send()


def task_failure_notification_alert(context):
    # Slack notification
    logging.info("Sending Task Slack notification")
    task_failure_alert(context)


default_args = {
    "owner": "analytics",
    "start_date": datetime(2021, 12, 12),
    'retries': 0,
    'retry_delay': timedelta(),
    "schedule_interval": "@daily"
}

dag = DAG('test_alert_notification',
          default_args=default_args,
          catchup=False,
          on_failure_callback=dag_failure_notification_alert
          )

start_task = DummyOperator(task_id="start_task", dag=dag, on_failure_callback=task_failure_notification_alert)
end_task = DummyOperator(task_id="end_task", dag=dag, on_failure_callback=task_failure_notification_alert)

create_table_sql_query = '''
CREATE TABLE dummy_table (id INT NOT NULL, name VARCHAR(250) NOT NULL);
'''
for i in range(5):
    create_table_task = PostgresOperator(
        sql=create_table_sql_query,
        task_id=str(i),
        postgres_conn_id="postgres_dummy_test",
        dag=dag,
        on_failure_callback=task_failure_notification_alert
    )
    start_task >> create_table_task >> end_task

DAG graph according to the above code.

enter image description here

As we can see in the above DAG graph image that if parallel Postgres tasks i.e. 0,1,2,3,4 is failing then on_failure_callback will call the python function(task_failure_notification_alert) with context to send a slack notification.

In the end, it is sending slack and email notifications both in case of DAG failure with context having on_failure_callback with dag_failure_notification_alert function call.

In case of Task failure,

The output seems to be like this:

DAG FAIL ALERT
    dag: <DAG: test_alert_notification>,
    dag_run: <DagRun test_alert_notification @ 2022-11-29 12:03:13.245324+00:00: manual__2022-11-29T12:03:13.245324+00:00, externally triggered: True>,
    execution_date: 2022-11-29T12:03:13.245324+00:00,
    run_id: manual__2022-11-29T12:03:13.245324+00:00,
    task_instance_key_str: test_alert_notification__4__20221129
    exception: The conn_id postgres_dummy_test isn't defined

or

DAG FAIL ALERT
    dag: <DAG: test_alert_notification>,
    dag_run: <DagRun test_alert_notification @ 2022-11-29 12:03:13.245324+00:00: manual__2022-11-29T12:03:13.245324+00:00, externally triggered: True>,
    execution_date: 2022-11-29T12:03:13.245324+00:00,
    run_id: manual__2022-11-29T12:03:13.245324+00:00,
    task_instance_key_str: test_alert_notification__5__20221129
    exception: The conn_id postgres_dummy_test isn't defined

for each different task.

In DAG failure, the context contains an exception as None and only a single task instance key which is the last success ID.

DAG failure Output format:

DAG FAIL ALERT
    dag: <DAG: test_alert_notification>,
    dag_run: <DagRun test_alert_notification @ 2022-11-30 09:33:02.032456+00:00: manual__2022-11-30T09:33:02.032456+00:00, externally triggered: True>,
    execution_date: 2022-11-30T09:33:02.032456+00:00,
    run_id: manual__2022-11-30T09:33:02.032456+00:00,
    task_instance_key_str: test_alert_notification__start_task__20221130
    exception: None

I want to pass task failure information i.e exceptions and task instances todag_failure_notification_alert to send an email with accumulated information of all failure tasks.

  1. I tried using a common global variable i.e. exceptions and task_instances as a list and appending all task exceptions and task instances to it inside the task_failure_notification_alert function. Later using the same variable inside the dag_failure_notification_alert function but it didn't work.
  2. I tried using python callback as mentioned here but it works with PythonOperator only.
  3. I read about XCOM push and pull mechanism but it focuses on sharing data between tasks(if I understand it correctly) and unsure how to use it here.

As I am new to this Airflow. Kindly suggest the best way to do it. Any other method which suits best for this kind of requirement?


Solution

  • Here is the solution I find for it from the stack overflow answer. We can get the list of failed tasks by using passed context only.

    e.g.

    ti = context['task_instance']
    for t in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED):  # type: TaskInstance
    logging.info(f'failed dag: {t.dag_id}, task: {t.task_id}, url: {t.log_url}')
    

    Updating the dag_failure_notification_alert as

    def dag_failure_notification_alert(context):
        # Slack notification
        logging.info("Sending DAG Slack notification")
        task_failure_alert(context)
    
        failed_tasks = []
        dag_name = None
        ti = context['task_instance']
        for t in ti.get_dagrun().get_task_instances(state=TaskInstanceState.FAILED):  # type: TaskInstance
            logging.info(f'failed dag: {t.dag_id}, task: {t.task_id}, url: {t.log_url}')
            dag_name = t.dag_id
            failed_tasks.append({'id': t.task_id, 'url': t.log_url})
    
        if failed_tasks:
            # Email notification
            subject = 'DAG Failure Alert'
            from_email = 'abcd@xyz.com'
            to_email = ['abcd@xyz.com']
            task_url_link = ""
            for failed_task in failed_tasks:
                task_url_link += """<a href="{}">{}</a>, """.format(failed_task['url'], failed_task['id'])
            task_url_link = task_url_link[:-2]
            message_body = """
                            <html>
                                <body>
                                <strong>Airflow DAG Failure Report</strong><br /><br />
                                <b>Dag Name:</b> {}<br />
                                <b>Task Details:</b> [{}]<br />
                                <br />
                                Thanks,<br />
                                </body>
                            </html>
                            """.format(dag_name, task_url_link)
            logging.info("Message body created for DAG as: %s", message_body)
            email_obj = Email(
                {'Subject': subject, 'From': from_email, 'To': to_email, 'body': message_body, 'file': None, 'filename': '',
                 'body_type': 'html'})
            email_obj.send()
        else:
            logging.info("No failure Tasks fetched.")
    
    

    Hope this will help anyone if someone faces the same issue that's why I post the answer.