Search code examples
djangoherokudjango-channelsdaphneasgi

Messages not getting to consumer unless Heroku dyno count is scaled way up


We have built a front-end with React and a back-end with Django Rest Frameworks and channels. We are using Heroku Redis as our Redis provider. Our users connect to Channels via a ReconnectingWebSocket.

We are using Python 3.6 and Channels 2.4

The issue is that our API calls are attempting to pass info to the sockets and they're not always making it to the consumer. I logged the steps of the call out via prints, printed the channel_name it's about to attempt to send it to and confirm it's what was returned to the user on connect, but the prints in the consumer don't get called meaning the message never gets sent to the user.

If I increase the number dynos to more or less a 1-1 with the users connected to sockets then it seems to solve the problem (or at least makes it much more reliable). From my understanding, 1 dyno should be able to handle many socket connections. Is there a reason that my consumer is not receiving the signals? Is there a reason scaling up the number of dynos resolved the problem?

On connect, I have the user join a group called "u_{their id}" to allow for potentially sending the signals to multiple computers logged in as the same user. I have tried sending the message through their channel_name directly and through that group, and when messages aren't going through neither seem to go through. the prints verify the channel_names are correct and the consumer still doesn't receive the messages. There doesn't seem to be any errors occuring. It may not work, then I'll refresh the recipient and it'll work, then I'll refresh the recipient again and it's back to not working.

The socket connection is certainly alive - I made a simple function on the front end that pings the socket and when I do it (even if the consumer isn't getting signals from API calls), it responds.

I also notice that if I restart my dynos, when they load up and the sockets reconnect, the first user has signals working through API calls for a short time then they start not coming through again. Also, if I don't use the sockets for a while then refresh they also seem to start working briefly again.

Procfile

web: daphne doctalk.asgi:application --port $PORT --bind 0.0.0.0

consumers.py

import json
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async

from channels.generic.websocket import AsyncWebsocketConsumer
from messages.models import Thread
from profile.models import OnlineStatus, DailyOnlineUserActivity
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from .exceptions import ClientError
import datetime
from django.utils import timezone

class HeaderConsumer(AsyncWebsocketConsumer):
    async def connect(self):   
        await self.accept()
        await self.send("request_for_token")


    async def continue_connect(self):
        print(self.channel_name)
        print(self.channel_layer)
        await self.send(json.dumps({'your_channel_name': self.channel_name}))

        await self.get_user_from_token(self.scope['token'])

        await self.channel_layer.group_send(
            "online_users",
            {
                "type": "new_user_online",
                "user": self.user,
                "channel_layer": str(self.channel_layer),
                "channel_name": self.channel_name,
            }
        )

        await self.channel_layer.group_add(
            "online_users",
            self.channel_name,
        )

        print("adding to personal group u_%d" % self.user['id'])
        await self.channel_layer.group_add(
            "u_%d" % self.user['id'],
            self.channel_name,
        )


        self.message_threads = set()

        self.message_threads = await self.get_message_ids()

        for thread in self.message_threads:
            await self.monitor_thread(thread)

        self.doa = await self.check_for_or_establish_dailyonlineactivity()
        self.online_status = await self.establish_onlinestatus()
        await self.add_to_online_status_list()

        self.user_id_list = await self.get_online_user_list()
        await self.send_online_user_list()

    async def disconnect(self, code):
        # Leave all the rooms we are still in
        if hasattr(self, 'user'):
            await self.remove_from_dailyonlineactivity()

            try:
                await self.channel_layer.group_discard(
                    "u_%d" % self.user['id'],
                    self.channel_name,
                )
            except Exception as e:
                print("issue with self channel")
                print(e)

            try:
                await self.channel_layer.group_send(
                    "online_users",
                    {
                        "type": "user_went_offline",
                        "message": self.user['id'],
                    }
                )

            except Exception as e:
                print("issue with online_users")
                print(e)

            await self.channel_layer.group_discard(
                "online_users",
                self.channel_name,
            )
            try:
                for thread_id in list(self.message_threads):
                    print("leaving " + str(thread_id))
                    try:
                        self.message_threads.discard(thread_id)
                        await self.channel_layer.group_discard(
                            "m_%d" % thread_id,
                            self.channel_name,
                        )
                    except ClientError:
                        pass
            except Exception as e:
                print("issue with threads")
                print(e)

    async def receive(self, text_data):
        print(text_data)
        text_data_json = json.loads(text_data)
        if 'token' in text_data_json:
            self.scope['token'] = text_data_json['token']
            await self.continue_connect()

        #self.send(text_data=json.dumps({
        #    'message': message
        #}))

    async def new_message(self, event):
        # Send a message down to the client
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "thread": event['thread'],
                "message": event["message"],
            },
        ))

    async def user_went_offline(self, event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def send_call_ring(self, event):
        print("SENDING CALL RING")
        print(event["message"])
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def rejoin_call(self, event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def popup_notification(self, event):
        print("sending popup_notification")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def new_call_participant(self, event):
        print("new_call_participant received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def new_participants_invited(self, event):
        print("new_participants_invited received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def share_document_via_videocall(self, event):    
        print("share_document received")
        print(event)
        print(self.channel_name)
        print(self.user['id'])
        
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_share_link(self, event):   

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_video_hand_up(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_video_address_hand_up(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def you_are_dominant_speaker(self, event):

        # Send a message down to the client
        print("SENDING DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def you_are_no_longer_dominant_speaker(self, event):

        print("SENDING NO LONGER DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_screenshare(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_video_reaction(self, event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def video_call_thread(self, event):

        print("sending video call thread")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def video_call_chat_message(self, event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_chat_message(self, event):

        print("sending event chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def to_next_agenda_item(self, event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def mute_all_event_participants(self, event):

        print("sending mute all participants")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def event_started(self, event):

        print("event started consumer")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def event_ended(self, event):

        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))
        
    async def video_call_reaction(self, event):

        print("sending video call reaction")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))


    async def new_user_online(self, event):

        print("user_online received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["user"],
                "channel_layer": event["channel_layer"],
                "channel_name": event["channel_name"],
            },
        ))

    @database_sync_to_async
    def get_message_ids(self):
        return set(Thread.objects.filter(participants__id=self.user['id'], subject="").values_list('id', flat=True))

    async def monitor_thread(self, thread_id):
        print("monitoring thread %d" % thread_id)
        print("on channel %s" % self.channel_name)
        await self.channel_layer.group_add(
            "m_%d" % thread_id,
            self.channel_name,
        )

    @database_sync_to_async
    def get_user_from_token(self, t):
        try:
            print("trying token" + t)
            token = Token.objects.get(key=t)
            self.user = token.user.get_profile.json()
        except Token.DoesNotExist:
            print("failed")
            self.user = AnonymousUser()

    @database_sync_to_async
    def check_for_or_establish_dailyonlineactivity(self):
        doa, created = DailyOnlineUserActivity.objects.get_or_create(date=datetime.date.today())
        if created:
            print("created DOA %d" %doa.id)
        else:
            print("found existing DOA %d" %doa.id)
        return doa

    @database_sync_to_async
    def establish_onlinestatus(self):
        old_os = OnlineStatus.objects.filter(user_id=self.user['id'], online_to=None)
        if old_os.exists():
            for os in old_os:
                print("found unclosed OS %d" % old_os[0].id)
                os.online_to = timezone.now()
                os.save()
        new_os = OnlineStatus(
            user_id=self.user['id'],
            channel_name=self.channel_name
        )
        new_os.save()
        return new_os

    @database_sync_to_async
    def add_to_online_status_list(self):
        self.doa.currently_active_users.add(self.user['id'])
        self.doa.all_daily_users.add(self.user['id'])
        self.doa.online_log.add(self.online_status)
        self.doa.save()

    @database_sync_to_async
    def remove_from_dailyonlineactivity(self):
        if hasattr(self, 'doa') and self.doa is not None:
            self.doa.currently_active_users.remove(self.user['id'])
        if hasattr(self, 'onine_status') and self.online_status is not None:
            self.online_status.online_to = timezone.now()
            self.online_status.save()

    @database_sync_to_async
    def get_online_user_list(self):   
        user_id_list = list(self.doa.currently_active_users.all().values_list('id', flat=True))
        user_id_list.remove(self.user['id'])
        return user_id_list

    async def send_online_user_list(self):
        print("sending online_users")
        await self.send(text_data=json.dumps(
            {
                "type": "online_users",
                "message": self.user_id_list,
            },
        ))

    async def participant_ignored(self, event):
        print("irgnored call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def participant_left(self, event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def participant_joined(self, event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

    async def video_screenshare(self, event):

        print("sending screenshare")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],
                "message": event["message"],
            },
        ))

The django signal triggered by adding a Profile to a VideoRoom:

@receiver(m2m_changed, sender=VideoRoom.invitees.through)
def invitee_added(sender, **kwargs):
    instance = kwargs.pop('instance', None)
    action = kwargs.pop('action', None)
    pk = kwargs.pop('pk_set', None)

    if action == 'post_add':    

        if len(pk) > 0:
            user = Profile.objects.get(id=list(pk)[0])
            if instance.initiator.id == user.id:
                return

            identity = "u_%d" % user.id

            # Create access token with credentials
            token = AccessToken(settings.TWILIO_ACCOUNT_SID, settings.TWILIO_API_KEY, settings.TWILIO_API_SECRET,
                                identity=identity, ttl=86399)

            # Create a Video grant and add to token
            video_grant = VideoGrant(room=instance.room_name)
            token.add_grant(video_grant)

            invitee_access_token = VideoAccessToken(user=user, token=token.to_jwt())
            invitee_access_token.save()

            instance.invitee_access_tokens.add(invitee_access_token)

            channel_layer = get_channel_layer()
            print(channel_layer)

            profiles = {"u_%d" % instance.initiator.id: instance.initiator.json()}

            for u in instance.current_participants.all():
                profiles["u_%d" % u.id] = u.json()
            print("instance.type")
            print(instance.type)
            if instance.type != 'event':
                print("sending to existing users")
                for key, value in profiles.items():
                    if value['id'] != user.id:
                        async_to_sync(channel_layer.group_send)(
                            key,
                            {'type': 'new_call_participant',
                             'message': {
                                 'key': "u_%d" % user.id,
                                 'value': user.json()
                             }
                             }
                        )

                ons = OnlineStatus.objects.get(user=user, online_to=None)
                print("in signal, sending to %s on channel %s" % (user.full_name, ons.channel_name))

                async_to_sync(channel_layer.send)(
                    ons.channel_name,
                    {'type': 'send_call_ring',
                     'message': {
                         'id': instance.id,
                         'room_name': instance.room_name,
                         'identity': "u_%d" % user.id,
                         'profiles': profiles,
                         'token': invitee_access_token.token.decode(),
                         'answered': False,
                         'initiated': False,
                         'caller': instance.initiator.json()
                     }
                     }
                )

Log during unsuccessful socket signal:

2021-03-11T15:16:14.489596+00:00 app[web.1]: pk
2021-03-11T15:16:14.489655+00:00 app[web.1]: {113}
2021-03-11T15:16:14.518051+00:00 app[web.1]: pk
2021-03-11T15:16:14.518058+00:00 app[web.1]: {68}
2021-03-11T15:16:14.786357+00:00 app[web.1]: sending to existing users
2021-03-11T15:16:14.786377+00:00 app[web.1]: u_113
2021-03-11T15:16:14.911441+00:00 app[web.1]: u_68
2021-03-11T15:16:14.915900+00:00 app[web.1]: in signal, sending to John Doe on channel u_68
2021-03-11T15:16:15.228644+00:00 app[web.1]: 10.63.249.212:12999 - - [11/Mar/2021:10:16:15] "POST /api/start-video-chat/" 200 3523
2021-03-11T15:16:15.231562+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=7ec75a21-c6bd-452b-9517-cd500064d7ee fwd="12.34.56.78" dyno=web.1 connect=3ms service=955ms status=200 bytes=3714 protocol=http

On a successful call:

2021-03-11T15:20:50.253243+00:00 app[web.4]: pk
2021-03-11T15:20:50.253248+00:00 app[web.4]: {113}
2021-03-11T15:20:50.280925+00:00 app[web.4]: pk
2021-03-11T15:20:50.280926+00:00 app[web.4]: {68}
2021-03-11T15:20:50.614504+00:00 app[web.4]: sending to existing users
2021-03-11T15:20:50.614527+00:00 app[web.4]: u_113
2021-03-11T15:20:50.713880+00:00 app[web.4]: u_68
2021-03-11T15:20:50.718141+00:00 app[web.4]: in signal, sending to John Doe on channel u_68
2021-03-11T15:20:50.799546+00:00 app[web.2]: CALLING
2021-03-11T15:20:50.801670+00:00 app[web.2]: {'type': 'send_call_ring', 'message': "some payload data"}
2021-03-11T15:20:50.965602+00:00 app[web.4]: 10.11.225.205:25635 - - [11/Mar/2021:10:20:50] "POST /api/start-video-chat/" 200 3533
2021-03-11T15:20:50.964378+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=2da9918b-b587-4db9-a3c2-9d6dfd55ef42 fwd="12.34.56.78" dyno=web.4 connect=1ms service=888ms status=200 bytes=3724 protocol=http

Solution

  • The issue ended up being the Redis. I converted from channels-redis to channels-rabbitmq and all of my issues went away. I don't know if it was with my Redis provider or with channels-redis, but simply changing the backend resolved all issues.