Search code examples
djangowebsocketcelerymqttdjango-channels

Listen to mqtt topics with django channels and celery


I would like a way to integrate django with mqtt and for that the first thing that came in my mind was using django-channels and an mqtt broker that supports mqtt over web sockets, so I could communicate directly between the broker and django-channels.

However, I did not found a way to start a websocket client from django, and acording to this link it's not possible.

Since I'm also starting to study task queues I wonder if it would be a good practice to start an mqtt client using paho-mqtt and then run that in a separate process using celery. This process would then forward the messages receives by the broker to django channels through websockets, this way I could also communicate with the client process, to publish data or stop the mqtt client when needed, and all that directly from django.

I'm a little skeptical about this idea since I also read that process run in celery should not take too long to complete, and in this case that's exactly what I want to do.

So my question is, how much of a bad idea that is? Is there any other option to directly integrate django with mqtt?

*Note: I dont want to have a separate process running on the server, I want to be able to start and stop the process from django, in order to have full control over the mqtt client from the web gui


Solution

  • I found a better way that does not need to use celery.

    I simply started a mqtt client on app/apps.py on the ready method, so a client will be started everytime I run the application. From here I can communicate with other parts of the system using django-channels or signals.

    apps.py:

    from django.apps import AppConfig
    from threading import Thread
    import paho.mqtt.client as mqtt
    
    
    class MqttClient(Thread):
        def __init__(self, broker, port, timeout, topics):
        super(MqttClient, self).__init__()
        self.client = mqtt.Client()
        self.broker = broker
        self.port = port
        self.timeout = timeout
        self.topics = topics
        self.total_messages = 0
    
    #  run method override from Thread class
    def run(self):
        self.connect_to_broker()
    
    def connect_to_broker(self):
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.connect(self.broker, self.port, self.timeout)
        self.client.loop_forever()
    
    # The callback for when a PUBLISH message is received from the server.
    def on_message(self, client, userdata, msg):
        self.total_messages = self.total_messages + 1
        print(str(msg.payload) + "Total: {}".format(self.total_messages))
    
    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(self, client, userdata, flags, rc):
        #  Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
        for topic in self.topics:
            client.subscribe(topic)
    
    
    class CoreConfig(AppConfig):
        default_auto_field = 'django.db.models.BigAutoField'
        name = 'core'
    
    def ready(self):
        MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()