I want to implement an email system which will send an email whenever my celery worker lost connection with my Redis server.
Whenever it lost connection it gives warning:
[2023-03-02 21:33:48,272: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
And starts reconnecting with the server
[2023-03-02 21:33:48,286: ERROR/MainProcess] consumer: Cannot connect to redis://127.0.0.1:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused.. Trying again in 2.00 seconds... (1/100)
I am using django 3.2.8 and celery 5.2.7.
I went through the whole celery docs and source code I get to know there is a method on_connection_error_after_connected
in the celery.worker.consumer.consumer
which get's triggered when it lost connection.
Okay I found a solution. I setup a different log and which logs the celery and also checks for any warn or error message similar to the connection lost. The code for that is here:
# Project/celery.py
from __future__ import absolute_import
import os
from celery import Celery
import logging
import celery
import logstash
from django.conf import settings
from utils.email_alert import send_shutting_down_email
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE','Project.settings')
app = Celery('Project')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
class LogstashEmailHandler(logging.handlers.SocketHandler):
def __init__(self, host, port, sender):
super().__init__(host, port)
self.sender = sender
def emit(self, record):
try:
# Send the log message to Logstash
logging.handlers.SocketHandler.emit(self, record)
except Exception:
self.handleError(record)
if 'consumer: Connection to broker lost.' in record.message:
send_shutting_down_email(record.message, 'Connection to broker lost.')
os.system("ps auxww | grep celery | grep -v 'grep' | awk '{print $2}' | xargs kill -HUP && celery -A project worker -l info")
elif 'Trying again in 2.00 seconds... (1/100)' in record.message:
send_shutting_down_email(record.message, 'Trying to connect to broker url.')
from celery.signals import after_setup_task_logger
from celery.signals import after_setup_logger
@after_setup_task_logger.connect
@after_setup_logger.connect
def initialize_logstash(logger=None,loglevel=logging.INFO, **kwargs):
handler = logstash.TCPLogstashHandler(settings.LOGSTASH_HOST, 5960,tags=['celery-' + settings.ENV_NAME],message_type='celery',version=1)
handler.setLevel(loglevel)
logger.addHandler(handler)
customhandler = LogstashEmailHandler(host= settings.LOGSTASH_HOST, port= 5960, sender = kwargs.get('sender'))
logger.addHandler(customhandler)
return logger
Here send_shutting_down_email
is the function which takes the message and subject as argument and sends it.
# send_shutting_down_email
from django.core.mail import EmailMultiAlternatives
from django.template.loader import render_to_string
from django.utils.html import strip_tags
from project.settings import ENV_NAME
def send_shutting_down_email(msg, sub):
subject = f'Project Celery {ENV_NAME} message: {sub}'
html_content = render_to_string('mails/celery.html', {'msg':msg,'env':ENV_NAME})
text_content = strip_tags(html_content)
mail_to_send = EmailMultiAlternatives(
subject,
text_content,
"alert@dummyemail.com",
['email@dummyemail.com']
)
mail_to_send.attach_alternative(html_content, "text/html")
mail_to_send.send()
I believe there is a better way of doing this. If any then please let me know!