Search code examples
python-3.xpython-asyncioaiohttp

Timeout WebSocket connections in aiohttp


My WebSocket server implementation is open to the world, but the client is required to send an authenticate message after the connection was established or the server should close the connection.

How can I implement this in aiohttp? It seems, I need to do the following things:

  1. Create an on_open method for every socket connection: I can't find a way (similarly to on_open in Tornado) to create such event.

  2. Create a timer: asyncio's sleep or call_back methods of the main event loop may be used. But I can't find a way to send the WebSocketResponse to the callback function:

    await asyncio.sleep(10, timer, loop=request.app.loop)

  3. Closing the connection if not authenticated

This is what I had before with Tornado:

def open(self, *args, **kwargs):
    self.timeout = ioloop.IOLoop.instance().add_timeout(
        datetime.timedelta(seconds=60),
        self._close_on_timeout
    )

def remove_timeout_timer(self):
    ioloop.IOLoop.instance().remove_timeout(self.timeout)
    self.timeout = None

def on_message(self, message):
    if message = 'AUTHENTICATE':
        self.authenticated = True
        self.remove_timeout_timer

def _close_on_timeout(self):
    if not self.authenticated:
        if self.ws_connection:
            self.close()

Here is what I have using aiohttp for setting up a timer:

async def ensure_client_logged(ws):
    await asyncio.sleep(3)  # wait 3 seconds
    await ws.send_str('hello')

async def ws_handler(request):
    ws = web.WebSocketResponse()

    asyncio.ensure_future(ensure_client_logged(ws), loop=request.app.loop)

But the code is running in a blocking way, meaning the server becomes unresponsive while it is sleeping.

Can someone please point me in the right direction?


Solution

  • You need to establish a deadline for the authentication procedure. asyncio.wait_for is a convenient way to do that:

    async def ws_handler(request):
        loop = asyncio.get_event_loop()
        ws = web.WebSocketResponse()
        loop.create_task(handle_client(ws))
    
    async def handle_client(ws):
        try:
            authenticated = await asyncio.wait_for(_authenticate(ws), 10)
        except asyncio.TimeoutError:
            authenticated = False
        if not authenticated:
            ws.close()
            return
        # continue talking to the client
    
    async def _authenticate(ws):
        # implement authentication here, without worrying about
        # timeout - the coroutine will be automatically canceled
        # once the timeout elapses
        ...
        return True  # if successfully authenticated