I have got an mqtt consumer that listens to a topic and based on that, I used to send a response on another topic. However now I would like to create a Websocket Secure wss endpoint where i could stream this processed information. Could you tell me if it is possible to do that wth mqttasgi library, if yes how.
Here I leave the code of my consumer.
from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage
import json
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('application/+/device/+/event/up', 2)
await self.channel_layer.group_add("stracontech", self.channel_name)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_message['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
dictresult = json.loads(mqtt_message['payload'])
jsonresult = json.dumps(dictresult)
processmqttmessage.delay(jsonresult, mqtt_message['topic'])
pass
async def publish_results(self, event):
data = event['result']
await self.publish("stracontech/procesed/"+event['result']['device_id']+"/result", json.dumps(data).encode('utf-8'), qos=2, retain=False)
async def disconnect(self):
await self.unsubscribe('application/+/device/+/event/up')
Pd: @Santiago Ivulich maybe you can help me with that.
For Websocket consumer you need to follow the guide in channels.
An example:
from channels.generic.websocket import AsyncWebsocketConsumer
class MyConsumer(AsyncWebsocketConsumer):
groups = ["broadcast"]
async def connect(self):
# Called on connection.
# To accept the connection call:
await self.accept()
# Or accept the connection and specify a chosen subprotocol.
# A list of subprotocols specified by the connecting client
# will be available in self.scope['subprotocols']
await self.accept("subprotocol")
# To reject the connection, call:
await self.close()
async def receive(self, text_data=None, bytes_data=None):
# Called with either text_data or bytes_data for each frame
# You can call:
await self.send(text_data="Hello world!")
# Or, to send a binary frame:
await self.send(bytes_data="Hello world!")
# Want to force-close the connection? Call:
await self.close()
# Or add a custom WebSocket error code!
await self.close(code=4123)
async def disconnect(self, close_code):
# Called when the socket closes
Then on your asgi.py:
import os
import django
from channels.routing import ProtocolTypeRouter
from my_application.consumers import MyMqttConsumer
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'integrator_mqtt.settings')
django.setup()
application = ProtocolTypeRouter({
'http': get_asgi_application(),
"websocket": MyConsumer.as_asgi(),
'mqtt': MyMqttConsumer.as_asgi(),
})
I would recommend to use daphne as the protocol server for the service that listens and streams to websockets.