Search code examples

How a django channel websocket middleware can receive message

How a middleware can read all the websocket message ?
From my understanding a django-channel middleware is like

    async def coroutine_call(self, inner_instance, scope, receive, send):
        ASGI coroutine; where we can resolve items in the scope
        (but you can't modify it at the top level here!)
        await inner_instance(receive, send)

I know that if I call await receive() instead of await inner_instance(receive, send), I will get a websocket message, but in this case the websocket handler will not work anymore.

How can a coroutine_call can receive the websocket message but also forward it to the next websocket middleware or handler ?


  • For the middleware to intercept the message it need to intercept the receive and send queues.

    here is an example of a complex middleware that does this

    A simpler version of this would be:

    class _InterceptionMiddleware:
        def __init__(self, application_cls, scope):
            self.application = application_cls(scope)
        async def __call__(self, receive, send):
            self.downstream_send = send
            # create a Queue for the upstream consumer to read messages from
            self.upstream_receive_queue = asyncio.Queue()
            # create a Queue for the upstream consumer to write messages to
            self.upstream_send_queue = asyncio.Queue()
            # pipe messages being sent to the upstream consumer to your interceptor method
            receiver = await_many_dispatch([receive], self.my_receive_interceptor_method)
            # pipe messages being sent buy the upstream consumer to your interceptor method
            sender = await_many_dispatch(
            # set up an asyncio task to handle these pipes
            receiver_task = asyncio.create_task(receiver)
            sender_task = asyncio.create_task(sender)
            # create an asyncio task for the upstream consumer
            upstream_task = asyncio.create_task(
                # pass the `get` and `put` methods of your upstream send and receive queues
                self.application(self.upstream_receive_queue.get, self.upstream_send_queue.put)
            # await it all
            done, pending = await asyncio.wait(
                [upstream_task, receiver_task, sender_task],
                # if any of them fail stop
            for task in [upstream_task, receiver_task, sender_task]:
                if not task.dont():
                    # we need to cancel this task.
                        await task
                    except CancelledError:
                        # we expect this error
        async def my_receive_interceptor_method(self, msg):
            # your interception code
            await self.upstream_receive_queue.put(msg)
        async def my_send_interceptor_method(self, msg):
            # your interception code
            await self.downstream_send(msg)
    def InterceptionMiddleware(application_cls):
        return functools.partial(_InterceptionMiddleware, application_cls)