Search code examples
pythondjangowebsocketdjango-channels

Django-channels : ChatConsumer is sending message to only one user instead of sending it to both users


I am implementing a chat application in django and angular using django-channels and redis.

The sockets get connected and work properly but the problem I am facing is that when two users are online and connect the same chatroom with the same thread url it connects but the messages sent by any user are only sent to the user that connected the socket recently and they are sent twice to only that user.

In django I did the following configurations:

settings/base.py

....
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.sites',

    'channels',
    'chats'
]

ASGI_APPLICATION = "influnite.routing.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
            # "hosts": [os.environ.get('REDIS_URL', 'redis://localhost:6379')]
        },
    },
}
....

routing.py

from django.conf.urls import url
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator, OriginValidator

from chats.consumers import ChatConsumer

application = ProtocolTypeRouter({
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                [
                    url(r"^messages/(?P<thread_id>[\w.+]+)/", ChatConsumer())
                ]
            )
        )
    )
})

I created three models namely Thread, ThreadMember & ChatMessage.

chats/models.py

from django.db import models
from django.db.models import Q
from django.utils import timezone
from django.contrib.auth.models import User

from base.models import BaseModel

# Create your models here.
MESSAGE_TYPE = [
    ('text', 'Text'),
    ('audio', 'Audio'),
    ('img', 'Image'),
    ('doc', 'Document'),
    ('link', 'Link')
]

THREAD_TYPE = [
    ('individual', 'Individual'),
    ('group', 'Group')
]

class Thread(BaseModel):
    name = models.CharField(max_length=20, null=True, blank=True)
    timestamp = models.DateTimeField(auto_now_add=True)
    thread_type = models.CharField(max_length=20, choices=THREAD_TYPE, default='individual')

    class Meta:
        db_table = 'in_thread'
        verbose_name = 'threads'
        verbose_name_plural = 'thread'
        ordering = ['-update_date']

class ThreadMember(BaseModel):
    thread = models.ForeignKey(Thread, on_delete=models.CASCADE, related_name='thread_member')
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='thread_member_user')
    is_grp_admin = models.BooleanField(default=False)

    def __str__(self):
        return f'{self.thread.name} > {self.user}'

    class Meta:
        db_table = 'in_thread_member'
        verbose_name = 'thread members'
        verbose_name_plural = 'thread member'

class ChatMessage(BaseModel):
    thread = models.ForeignKey(Thread, on_delete=models.CASCADE, related_name='msg_thread')
    sender = models.ForeignKey(ThreadMember, on_delete=models.CASCADE, related_name='msg_sender')
    message = models.TextField(null=True, blank=True)
    sent_at = models.DateTimeField(default=timezone.now())
    read_receipt = models.BooleanField(default=False)
    msg_type = models.CharField(max_length=20, choices=MESSAGE_TYPE, default='text')

    def __str__(self):
        return f'{self.sender} > {self.message}'

    class Meta:
        db_table = 'in_chat_message'
        verbose_name = 'chat message'
        verbose_name_plural = 'chat messages'
        ordering = ['sent_at']

Below is the consumers.py file which contains the ChatConsumer class.

chats/consumers.py

from django.contrib.auth.models import User

import asyncio, json
from channels.consumer import AsyncConsumer
from channels.db import database_sync_to_async

from .models import Thread, ThreadMember, ChatMessage
from .serializers import ChatMessageSerializer

class ChatConsumer(AsyncConsumer):
    async def websocket_connect(self, event):
        print("connected", event)
        try:
            kwargs = self.scope['url_route']['kwargs']

            thread_id = kwargs.get('thread_id', False)
            if thread_id:
                thread = await self.get_thread(thread_id)
                if thread:
                    self.chat_room = f'thread_{thread_id}'
                    await self.channel_layer.group_add(
                        self.chat_room,
                        self.channel_name
                    )
                    await self.send({
                        "type": "websocket.accept"
                    })
            else:
                await self.send({
                    "type": "websocket.close"
                })
        except Exception as e:
            print("Error in websocket connection!")
            print(e)

    async def websocket_receive(self, event):
        print("receive", event)
        try:
            kwargs = self.scope['url_route']['kwargs']
            
            thread_id = kwargs.get('thread_id', False)
            thread = await self.get_thread(thread_id)

            response = event.get('text', False)
            response = json.loads(response)
            message = response.get('message', False)

            if message:
                data, message_saved = await self.save_message(
                    message, response.get('sender'), thread)

                if message_saved:
                    text = json.dumps(response)

                    if thread:
                        await self.channel_layer.group_send(
                            self.chat_room,
                            {
                                "type": "chat_message",
                                "text": text
                            }
                        )
        except Exception as e:
            print("Error in websocket receive!")
            print(e)
    
    async def websocket_disconnect(self, event):
        print("disconnected", event)

    async def chat_message(self, event):
        """sends the actual message"""
        try:
            await self.send({
                "type": "websocket.send",
                "text": event['text']
            })
        except Exception as e:
            print("Error sending messages!")
            print(e)

    @database_sync_to_async
    def get_thread(self, thread_id):
        return Thread.objects.get(id=thread_id)
    
    @database_sync_to_async
    def save_message(self, message, sender, thread):
        try:
            sender = ThreadMember.objects.get(
                thread=thread.id,
                user=User.objects.get(id=sender))
            chat = ChatMessage.objects.create(
                thread=thread,
                sender=sender,
                message=message
            )
            chat.save()
            thread.save()
            return ChatMessageSerializer(chat).data, True
        except Exception as e:
            print("Error saving chat!")
            print(e)
            return False

I get the following when I run the redis server.

C:\Users\rh>cd C:\Program Files\Redis
C:\Program Files\Redis>redis-server redis.windows.conf
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 5.0.10 (1c047b68/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 9628
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

[9628] 11 Dec 12:38:17.011 # Server initialized
[9628] 11 Dec 12:38:17.011 * DB loaded from disk: 0.000 seconds
[9628] 11 Dec 12:38:17.011 * Ready to accept connections

I don't know what I'm doing wrong here and I appreciate some help. I'll improve my question if more information needed.

Thanks in Advance!


Solution

  • Try this:

    1. Import get_asgi_application from django.core.asgi.
    from django.core.asgi import get_asgi_application
    
    1. allow Django's ASGI application to handle traditional http requests by adding http as key & get_asgi_application() as the value of key in the dictionary inside ProtocolTypeRouter.
    application = ProtocolTypeRouter({
       "http": get_asgi_application(),
       ....
    })
    
    1. use/call the as_asgi() classmethod while routing the ChatConsumer consumer.
    url(
       r"^messages/(?P<username>[\w.@+-]+)/$",
       ChatConsumer.as_asgi()
    ),
    

    routing.py

    from django.conf.urls import url
    from django.core.asgi import get_asgi_application
    
    from channels.routing import ProtocolTypeRouter, URLRouter
    from channels.auth import AuthMiddlewareStack
    from channels.security.websocket import (
         AllowedHostsOriginValidator,
         OriginValidator
    )
    
    from chat.consumers import ChatConsumer
    
    application = ProtocolTypeRouter({
        "http": get_asgi_application(),
        "websocket": AllowedHostsOriginValidator(
            AuthMiddlewareStack(
                URLRouter([
                    url(
                       r"^messages/(?P<username>[\w.@+-]+)/$",
                       ChatConsumer.as_asgi()
                    ),
               ])
            )
        )
    })
    

    More Information on channels.readthedocs.io.