I wrote a WebSocket connection in a Django app, using Django Channels and I'm testing in my local environment with Daphne (I will use uvicorn
for production)
Here's a function that will be called in the save
method of UserNotification
model, to send title
and message
of notification, through user WebSocket connection.
from typing import Type
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
def send_user_notification(
user_id: int | Type[int], title: str, message: str
):
channel_layer = get_channel_layer()
channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(user_id=user_id)
group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(channel_name=channel_name)
async_to_sync(channel_layer.group_send)(
group_name,
{
"type": "user_notify",
"message": {
"title": title, "message": message
},
},
)
class UserNotification(models.Model):
user = models.ForeignKey("users.User", on_delete=models.CASCADE)
notification = models.ForeignKey(
to="notification.Notification", on_delete=models.CASCADE
)
def save(self, **kwargs):
send_user_notification(
user_id=self.user_id,
title=self.notification.title,
message=self.notification.message,
message_type=self.notification.message_type,
)
super().save(**kwargs)
And here's my AsyncWebsocketConsumer
:
import json
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from django.conf import settings
class UserNotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
self.channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(
user_id=self.scope["user"].pk
)
self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
channel_name=self.channel_name
)
self.channel_layer = get_channel_layer()
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def user_notify(self, event):
print("Event: ", event)
data = event["message"]
await self.send(text_data=json.dumps(data))
And here's the asgi.py
file:
import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from apps.notification.auth import JWTAuthMiddlewareStack
from apps.notification.consumers import UserNotificationConsumer
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "apply_link.settings")
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": JWTAuthMiddlewareStack(URLRouter(
[
path(f"{settings.WEBSOCKET_PREFIX}/v1/notifications/", UserNotificationConsumer.as_asgi()),
]
)),
}
)
And Here's the authentication middleware:
from urllib.parse import parse_qs
from channels.middleware import BaseMiddleware
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.tokens import AccessToken
from apps.users.models import User
class WebSocketJWTAuthMiddleware(BaseMiddleware):
async def __call__(self, scope, receive, send):
query_string = scope["query_string"].decode("utf8")
query_dict = parse_qs(query_string)
token = query_dict.get(settings.WEBSOCKET_AUTH_QUERY_PARAM)[0]
try:
# This will validate the token only
token: AccessToken = AccessToken(token)
except (InvalidToken, TokenError):
return None
else:
try:
scope["user"] = await User.objects.aget(id=token["user_id"])
except User.DoesNotExist:
scope["user"] = AnonymousUser()
return await super().__call__(scope, receive, send)
def JWTAuthMiddlewareStack(inner):
return WebSocketJWTAuthMiddleware(inner)
And here's my settings.py
:
NOTIFICATION_WEBSOCKET_CHANNEL_NAME = "user_{user_id}_notifications"
NOTIFICATION_WEBSOCKET_GROUP_NAME = "group_{channel_name}"
WEBSOCKET_AUTH_QUERY_PARAM = "token"
WEBSOCKET_PREFIX = env.str("WEBSOCKET_PREFIX", "ws").strip("/ ")
REDIS_USER = env.str("REDIS_USER", "default")
REDIS_PASSWORD = env.str("REDIS_PASSWORD")
REDIS_HOST = env.str("DJANGO_REDIS_HOST")
REDIS_PORT_NUMBER = env.str("REDIS_PORT_NUMBER", "6379")
REDIS_CONNECTION_URI = (
f"redis://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT_NUMBER}/0"
)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [REDIS_CONNECTION_URI],
},
},
}
# Tested both memory and Redis as the backend
# CHANNEL_LAYERS = {
# "default": {
# "BACKEND": "channels.layers.InMemoryChannelLayer"
# }
# }
Here's what I can do successfully and there are no issues:
runserver
on 0.0.0.0:8000
and Daphne will serve the app.Postman
, with a valid JWT in the query parameters (and scope["user"]
is the right user)Problem is that: when I save the UserNotification
model, it calls send_user_notification
and also channel_layer.group_send
will be called successfully for both memory and Redis channels. But UserNotificationConsumer.user_notify
method won't get called and no data will be sent to user through the open and working WebSocket connection.
I found the solution.
Overriding self.channel_name
in Consumer connect
method, was the issue. I kept it's default value and used it, when I add the group.
So the final working version of Consumer class would be like this:
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from django.conf import settings
class UserNotificationConsumer(AsyncWebsocketConsumer):
async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
user_id=self.scope["user"].pk
)
self.channel_layer = get_channel_layer()
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def user_notify(self, event):
data = event["message"]
await self.send(text_data=json.dumps(data))