Search code examples
pythonpython-3.xdjangomqttasgi

Is it possible to send an MQTT message with mqttasgi inside a Celery Worker that use Redis Backend


I am using mqttasgi library in Django to receive a large number of messages, and process them with a REDIS queue and I would like to publish this information back to another TOPIC. Is this possible? If yes, how can I do it ? For the moment I am only overriding the publish function into my consumer as below.

from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json

class MyMqttConsumer(MqttConsumer):

async def connect(self):
    await self.subscribe('application/5/device/+/event/up', 2)

async def receive(self, mqtt_message):
    print('Received a message at topic:', mqtt_message['topic'])
    print('With payload', mqtt_message['payload'])
    print('And QOS:', mqtt_message['qos'])
    print(type(mqtt_message['payload']))
    dictresult = json.loads(mqtt_message['payload'])
    print(type(dictresult))
    print(dictresult)
    jsonresult = json.dumps(dictresult)
    print(type(jsonresult))
    print(jsonresult)
    processmqttmessage.delay(jsonresult)
    print("test")
    pass

async def publish(self, topic, payload, qos=1, retain=False):
    await self.send({
        'type': 'mqtt.pub',
        'mqtt': {
            'topic': topic,
            'payload': payload,
            'qos': qos,
            'retain': retain,
        }
    })

async def disconnect(self):
    await self.unsubscribe('application/5/device/+/event/up')

I want to be able able to now publish but from the inside of my task processmqttmessage.

Thank you.

Pd: @Santiago Ivulich maybe you can help me with that.


Solution

  • Yes it's possible, there is no need to override the publish of the base consumer. I would recommend to return the result that needs to be published back to MQTTAsgi in order to maintain a single MQTT connection. For this you can use a group in the channel layer in order to send back to mqttasgi what needs to be sent.

    from mqttasgi.consumers import MqttConsumer
    from mqtt_handler.tasks import processmqttmessage
    import json
    
    class MyMqttConsumer(MqttConsumer):
        async def connect(self):
            await self.subscribe('application/5/device/+/event/up', 2)
            # Subscribe consumer to channel layer group.
            await self.channel_layer.group_add("my.group", self.channel_name)
    
        async def receive(self, mqtt_message):
            print('Received a message at topic:', mqtt_message['topic'])
            print('With payload', mqtt_message['payload'])
            print('And QOS:', mqtt_message['qos'])
            print(type(mqtt_message['payload']))
            dictresult = json.loads(mqtt_message['payload'])
            print(type(dictresult))
            print(dictresult)
            jsonresult = json.dumps(dictresult)
            print(type(jsonresult))
            print(jsonresult)
            processmqttmessage.delay(jsonresult)
            print("test")
            pass
    
        async def publish_results(self, event):
            data = event['text']
            self.publish('my/publish/topic', data, qos=2, retain=False)
    
    
        async def disconnect(self):
            await self.unsubscribe('application/5/device/+/event/up')
    

    And from the celery task:

    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync
    
    def processmqttmessage():
        ...
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)("my.group", 
        {"type": "publish.results", "text":"Hi from outside of the consumer"})
    
    

    If multiple consumers will be running simultaneously you can programmatically generate a name for the group and pass it to the task as a parameter.

    IMPORTANT NOTE: Be certain that you are using the same channels backend in your celery and mqttasgi project.