Search code examples
djangowebsocketdjango-channels

How a django channel websocket middleware can receive message


How a middleware can read all the websocket message ?
From my understanding a django-channel middleware is like https://github.com/django/channels/blob/2a98606c1e0600cbae71ae1f02f31aae5d01f82d/channels/middleware.py.

    async def coroutine_call(self, inner_instance, scope, receive, send):
        """
        ASGI coroutine; where we can resolve items in the scope
        (but you can't modify it at the top level here!)
        """
        await inner_instance(receive, send)

I know that if I call await receive() instead of await inner_instance(receive, send), I will get a websocket message, but in this case the websocket handler will not work anymore.

How can a coroutine_call can receive the websocket message but also forward it to the next websocket middleware or handler ?


Solution

  • For the middleware to intercept the message it need to intercept the receive and send queues.

    here is an example of a complex middleware that does this https://github.com/hishnash/channelsmultiplexer/blob/master/channelsmultiplexer/demultiplexer.py.

    A simpler version of this would be:

    
    
    class _InterceptionMiddleware:
        def __init__(self, application_cls, scope):
            self.application = application_cls(scope)
    
    
        async def __call__(self, receive, send):
            self.downstream_send = send
    
            # create a Queue for the upstream consumer to read messages from
            self.upstream_receive_queue = asyncio.Queue()
    
            # create a Queue for the upstream consumer to write messages to
            self.upstream_send_queue = asyncio.Queue()
    
            # pipe messages being sent to the upstream consumer to your interceptor method
            receiver = await_many_dispatch([receive], self.my_receive_interceptor_method)
    
            # pipe messages being sent buy the upstream consumer to your interceptor method
            sender = await_many_dispatch(
                [self.upstream_send_queue.get],
                self.my_send_interceptor_method
            )
    
            # set up an asyncio task to handle these pipes
            receiver_task = asyncio.create_task(receiver)
            sender_task = asyncio.create_task(sender)
    
            # create an asyncio task for the upstream consumer
            upstream_task = asyncio.create_task(
                # pass the `get` and `put` methods of your upstream send and receive queues
                self.application(self.upstream_receive_queue.get, self.upstream_send_queue.put)
            )
    
            # await it all
            done, pending = await asyncio.wait(
                [upstream_task, receiver_task, sender_task],
                # if any of them fail stop
                return_when=asyncio.FIRST_COMPLETED
            )
            for task in [upstream_task, receiver_task, sender_task]:
                if not task.dont():
                    # we need to cancel this task.
                    task.cancel()
                    try:
                        await task
                    except CancelledError:
                        # we expect this error
                        pass
    
    
        async def my_receive_interceptor_method(self, msg):
            # your interception code
            await self.upstream_receive_queue.put(msg)
    
    
        async def my_send_interceptor_method(self, msg):
            # your interception code
            await self.downstream_send(msg)
    
    
    def InterceptionMiddleware(application_cls):
        return functools.partial(_InterceptionMiddleware, application_cls)