Search code examples
pythondjangoredisceleryworker

Setup an email notification when consumer: Connection to broker lost in celery


I want to implement an email system which will send an email whenever my celery worker lost connection with my Redis server.

Whenever it lost connection it gives warning: [2023-03-02 21:33:48,272: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...

And starts reconnecting with the server [2023-03-02 21:33:48,286: ERROR/MainProcess] consumer: Cannot connect to redis://127.0.0.1:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused.. Trying again in 2.00 seconds... (1/100)

I am using django 3.2.8 and celery 5.2.7.

I went through the whole celery docs and source code I get to know there is a method on_connection_error_after_connected in the celery.worker.consumer.consumer which get's triggered when it lost connection.


Solution

  • Okay I found a solution. I setup a different log and which logs the celery and also checks for any warn or error message similar to the connection lost. The code for that is here:

    # Project/celery.py
    from __future__ import absolute_import
    import os
    from celery import Celery
    import logging
    import celery
    import logstash
    from django.conf import settings
    from utils.email_alert import send_shutting_down_email
    import os
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE','Project.settings')
    app = Celery('Project')
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    class LogstashEmailHandler(logging.handlers.SocketHandler):
        def __init__(self, host, port, sender):
            super().__init__(host, port)
            self.sender = sender
        def emit(self, record):
            try:
                # Send the log message to Logstash
                logging.handlers.SocketHandler.emit(self, record)
            except Exception:
                self.handleError(record)
            if 'consumer: Connection to broker lost.' in record.message:
                send_shutting_down_email(record.message, 'Connection to broker lost.')
                os.system("ps auxww | grep celery | grep -v 'grep' | awk '{print $2}' | xargs kill -HUP && celery -A project worker -l info")
            elif 'Trying again in 2.00 seconds... (1/100)' in record.message:
                send_shutting_down_email(record.message, 'Trying to connect to broker url.')
    
    
    
    from celery.signals import after_setup_task_logger
    from celery.signals import after_setup_logger
    
    @after_setup_task_logger.connect
    @after_setup_logger.connect
    def initialize_logstash(logger=None,loglevel=logging.INFO, **kwargs):
        handler = logstash.TCPLogstashHandler(settings.LOGSTASH_HOST, 5960,tags=['celery-' + settings.ENV_NAME],message_type='celery',version=1)
        handler.setLevel(loglevel)
        logger.addHandler(handler)
        customhandler = LogstashEmailHandler(host= settings.LOGSTASH_HOST, port= 5960, sender = kwargs.get('sender'))
        logger.addHandler(customhandler)
        return logger
    

    Here send_shutting_down_email is the function which takes the message and subject as argument and sends it.

    # send_shutting_down_email
    from django.core.mail import EmailMultiAlternatives
    from django.template.loader import render_to_string
    from django.utils.html import strip_tags
    from project.settings import ENV_NAME
    
    def send_shutting_down_email(msg, sub):
        subject = f'Project Celery {ENV_NAME} message: {sub}'
        html_content = render_to_string('mails/celery.html', {'msg':msg,'env':ENV_NAME})
        text_content = strip_tags(html_content)
        mail_to_send = EmailMultiAlternatives(
            subject,
            text_content,
            "alert@dummyemail.com",
            ['email@dummyemail.com']
        )
        mail_to_send.attach_alternative(html_content, "text/html")
        mail_to_send.send()
    

    I believe there is a better way of doing this. If any then please let me know!