Search code examples
djangowebsocketdjango-channels

`channel_layer.group_send` won't call method in `AsyncWebsocketConsumer`


I wrote a WebSocket connection in a Django app, using Django Channels and I'm testing in my local environment with Daphne (I will use uvicorn for production)

Here's a function that will be called in the save method of UserNotification model, to send title and message of notification, through user WebSocket connection.

from typing import Type

from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings


def send_user_notification(
    user_id: int | Type[int], title: str, message: str
):
    channel_layer = get_channel_layer()

    channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(user_id=user_id)
    group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(channel_name=channel_name)

    async_to_sync(channel_layer.group_send)(
        group_name,
        {
            "type": "user_notify",
            "message": {
                "title": title, "message": message
            },
        },
    )


class UserNotification(models.Model):
    user = models.ForeignKey("users.User", on_delete=models.CASCADE)
    notification = models.ForeignKey(
        to="notification.Notification", on_delete=models.CASCADE
    )

    def save(self, **kwargs):
        send_user_notification(
            user_id=self.user_id,
            title=self.notification.title,
            message=self.notification.message,
            message_type=self.notification.message_type,
        )

        super().save(**kwargs)

And here's my AsyncWebsocketConsumer:

import json
import logging

from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from django.conf import settings


class UserNotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        if self.scope["user"].is_anonymous:
            await self.close()

        self.channel_name = settings.NOTIFICATION_WEBSOCKET_CHANNEL_NAME.format(
            user_id=self.scope["user"].pk
        )
        self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
            channel_name=self.channel_name
        )

        self.channel_layer = get_channel_layer()
        await self.channel_layer.group_add(self.group_name, self.channel_name)

        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)

    async def user_notify(self, event):
        print("Event: ", event)
        data = event["message"]
        await self.send(text_data=json.dumps(data))

And here's the asgi.py file:

import os

from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

from apps.notification.auth import JWTAuthMiddlewareStack
from apps.notification.consumers import UserNotificationConsumer

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "apply_link.settings")

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": JWTAuthMiddlewareStack(URLRouter(
            [
                path(f"{settings.WEBSOCKET_PREFIX}/v1/notifications/", UserNotificationConsumer.as_asgi()),
            ]
        )),
    }
)

And Here's the authentication middleware:

from urllib.parse import parse_qs

from channels.middleware import BaseMiddleware
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.tokens import AccessToken

from apps.users.models import User


class WebSocketJWTAuthMiddleware(BaseMiddleware):
    async def __call__(self, scope, receive, send):
        query_string = scope["query_string"].decode("utf8")
        query_dict = parse_qs(query_string)
        token = query_dict.get(settings.WEBSOCKET_AUTH_QUERY_PARAM)[0]

        try:
            # This will validate the token only
            token: AccessToken = AccessToken(token)

        except (InvalidToken, TokenError):
            return None

        else:
            try:
                scope["user"] = await User.objects.aget(id=token["user_id"])
            except User.DoesNotExist:
                scope["user"] = AnonymousUser()

        return await super().__call__(scope, receive, send)


def JWTAuthMiddlewareStack(inner):
    return WebSocketJWTAuthMiddleware(inner)

And here's my settings.py:

NOTIFICATION_WEBSOCKET_CHANNEL_NAME = "user_{user_id}_notifications"
NOTIFICATION_WEBSOCKET_GROUP_NAME = "group_{channel_name}"

WEBSOCKET_AUTH_QUERY_PARAM = "token"
WEBSOCKET_PREFIX = env.str("WEBSOCKET_PREFIX", "ws").strip("/ ")

REDIS_USER = env.str("REDIS_USER", "default")
REDIS_PASSWORD = env.str("REDIS_PASSWORD")
REDIS_HOST = env.str("DJANGO_REDIS_HOST")
REDIS_PORT_NUMBER = env.str("REDIS_PORT_NUMBER", "6379")

REDIS_CONNECTION_URI = (
    f"redis://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT_NUMBER}/0"
)

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [REDIS_CONNECTION_URI],
        },
    },
}
# Tested both memory and Redis as the backend
# CHANNEL_LAYERS = {
#     "default": {
#         "BACKEND": "channels.layers.InMemoryChannelLayer"
#     }
# }

Here's what I can do successfully and there are no issues:

  • I can execute runserver on 0.0.0.0:8000 and Daphne will serve the app.
  • Redis is working fine. Also I tried memory as backend and it is working.
  • I can connect to WebSocket, with Postman, with a valid JWT in the query parameters (and scope["user"] is the right user)

Problem is that: when I save the UserNotification model, it calls send_user_notification and also channel_layer.group_send will be called successfully for both memory and Redis channels. But UserNotificationConsumer.user_notify method won't get called and no data will be sent to user through the open and working WebSocket connection.


Solution

  • I found the solution.

    Overriding self.channel_name in Consumer connect method, was the issue. I kept it's default value and used it, when I add the group.

    So the final working version of Consumer class would be like this:

    import json
    
    from channels.generic.websocket import AsyncWebsocketConsumer
    from channels.layers import get_channel_layer
    from django.conf import settings
    
    
    class UserNotificationConsumer(AsyncWebsocketConsumer):
        async def connect(self):
            if self.scope["user"].is_anonymous:
                await self.close()
    
            self.group_name = settings.NOTIFICATION_WEBSOCKET_GROUP_NAME.format(
                user_id=self.scope["user"].pk
            )
    
            self.channel_layer = get_channel_layer()
            await self.channel_layer.group_add(self.group_name, self.channel_name)
    
            await self.accept()
    
        async def disconnect(self, close_code):
            await self.channel_layer.group_discard(self.group_name, self.channel_name)
    
        async def user_notify(self, event):
            data = event["message"]
            await self.send(text_data=json.dumps(data))