Search code examples
djangodjango-channels

How to create task which sends data continuously and disconnect safely in consumers, django channels 2?


From this answer, which helps to send data from consumers for every n seconds.

Tried to handle the disconnection properly, using creat_task method, tried to stop while-loop(which is used to send data for every n seconds) by sending a flag=False(Assuming, this flag is not sent to the same instance which is created the task).

consumers.py:

class AutoUpdateConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        print("connected", event)
        await self.send({
            "type": "websocket.accept"
        })
        await self.create_task(True)

    async def websocket_receive(self, event):
        print("receive", event)

    async def websocket_disconnect(self, event):
        await self.create_task(False)

        print("disconnected", event)


    async def create_task(self, flag=True):
        while flag:
            await asyncio.sleep(2)

            df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])

            await self.send({
                'type': 'websocket.send',
                'text': df.to_html(),
            })

Warning:

2019-09-11 14:40:06,400 - WARNING - server - Application instance 
<Task pending coro=<SessionMiddlewareInstance.__call__() running at 
D:\Django\Django channels\django_channels_env\lib\site-packages\channels\sessions.py:175>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 
0x000001870E06C618>()] for connection <WebSocketProtocol client=
['127.0.0.1', 63789] path=b'/ws/home'> took too long to shut down and was 
killed.

How to stop_task safely instead of waiting for channels to kill task?

Or

How to stop infinite while loop running in a method, from another method in same class?

Versions:

  • Django == 2.0.7
  • channels == 2.1.2

Solution

  • I would suggest creating a group when connecting to the consumer. That way you can trigger a message from anywhere in your django project as long as you know the group name (auto_update).

    from channels.generic.websocket import AsyncWebsocketConsumer
    
    class AutoUpdateConsumer(AsyncWebsocketConsumer):
        async def connect(self):
            print('connect')
    
            # join the group
            self.group_name = 'auto_update'
            await self.channel_layer.group_add(
                self.group_name,
                self.channel_name
            )
            await self.accept()
    
        async def disconnect(self, event):
            print('disconnect')
    
            # leave the group
            await self.channel_layer.group_discard(
                self.group_name,
                self.channel_name
            )
    
        async def receive(self, event):
            print('receive')
    
        async def auto_update(self, event):
            print('sending df')
            df = event['df']
    
            await self.send({
                'text': df
            })
    

    To send the message I would use a custom management command. To stop the command I would create a singleton model (a model with only one instance) that has a boolean field that can be periodically checked to see if the loop should be stopped.

    First use get_channel_layer() to get the active layer that communicates with redis, then in the loop call group_send to invoke a consumer method specified by the type key.

    # /project/app/management/commands/auto_update.py
    
    from django.core.management.base import BaseCommand
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer
    from config.models import AutoUpdateSettings
    
    class Command(BaseCommand):
        help = 'Command to start auto updating'
    
        def handle(self, *args, **kwargs):
            settings = AutoUpdateSettings.objects.first()
            settings.keep_running = True
            settings.save()
    
            group_name = 'auto_update'
            channel_layer = get_channel_layer()
    
            while True:
                settings.refresh_from_db()
                if not settings.keep_running:
                    break
    
                df= pd.DataFrame(data=[random.sample(range(100), 4) for _ in range(5)])
                async_to_sync(channel_layer.group_send)(
                    group_name,
                    {
                        'type': 'auto_update',  # this is the name of your consumer method
                        'df': df.to_html()
                    }
                )
    

    To start the loop that sends the message to the group you would call the command python manage.py auto_update. To stop the command you would use the admin page and set keep_running to false.