I am trying to create a Slack alert that posts some basic information into a slack channel when a task in an Airflow DAG is successfully run. My code is as follows:
import datetime
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = "slack_test"
def task_success_slack_alert(context):
"""
Callback task that can be used in DAG to alert of successful task completion
Args:
context (dict): Context variable passed in from Airflow
Returns:
None: Calls the SlackWebhookOperator execute method internally
"""
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:large_blue_circle: Task completed successfully
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Start Time*: {start}
*End Time*: {end}
*Duration*: {duration} seconds
*Try* {try_number} *of max retries* {max_tries}
*Log Url*: {log_url}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
ti=context.get("task_instance"),
exec_date=context.get("execution_date"),
log_url=context.get("task_instance").log_url,
start=context.get("task_instance").start_date,
end=context.get("task_instance").end_date,
duration=context.get("task_instance").duration,
try_number=context.get("task_instance")._try_number,
max_tries=context.get("task_instance").max_tries + 1
)
success_alert = SlackWebhookOperator(
task_id="slack_test",
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username="airflow",
)
return success_alert.execute(context=context)
default_dag_args = {'owner': 'TEST',
'start_date': datetime.datetime(2021, 3, 29),
'retries': 1,
'use_legacy_sql': False,
'on_success_callback': task_success_slack_alert
}
with models.DAG('TESTING',
schedule_interval='0 9 * * *',
default_args=default_dag_args) as dag:
I then have a series of BigQueryOperator tasks that write data into big query tables. The slack message is output after every task is run however sometimes the 'End Time' and 'Duration' is 'None', but when i check the logs for the DAG I can see the end time is recorded correctly. Sometimes the Duration/End Time works and sometimes it doesn't, I can't see a logical pattern as to why this is. I have also noticed that sometimes it will post the slack alert twice for the same task, once with the Duration/End Time as 'None' and then again with them populated. Does anyone know why this is happening?
There are two points where on_success_callback can be called.
There are cases where callbacks are executed twice in quick succession due to race conditions between the task instance execution and the local task job. This Github Issue describes a similar scenario as yours.
Without knowing the time when the slack messaging are sent, I can't say for sure it is the same issue. If I had to guess, the slack message with missing end time and duration is from the local task job and the slack message with the metadata filled in is from the task instance.
If you add some caller inspection code in your callback, you can figure out where it's coming from. Please reference this StackOverflow post on getting caller information.
You mentioned that the callback from local_task_job.py
has the full set of metadata output. This does makes sense from the code perspective. The end_date
is not set before on_success_callback
is invoked in taskinstance.py
. On the other hand, when the the local task job heartbeats and sees that the state is SUCCESS, it will run the on_success_callback
, which at this point in time end_date
has been set.
That reminds me, in the past, I ran into this issue as well and used the current time of when the success callback is invoked as the end time.