Search code examples
pythonwebsockettornadokeyerror

Python Tornado KeyError When removing client from clients set


I have a Python Tornado Websocket server that stores clients in a shared set() so that I know how many clients are connected.

The challenge is that calling on_close after WebSocketClosedError raises a KeyError and the client-instance is not removed from the set of connected clients. This error has caused my server to accumulate over 1000 clients even when the active clients are only around 5.

My Code:

import tornado.iostream
import tornado.websocket
import asyncio


class SocketHandler(tornado.websocket.WebSocketHandler):
    socket_active_message = {"status": "Socket Connection Active"}
    waiters = set()

    def initialize(self):
        self.client_name = "newly_connected"

    def open(self):
        print('connection opened')
        # https://kite.com/python/docs/tornado.websocket.WebSocketHandler.set_nodelay
        self.set_nodelay(True)
        SocketHandler.waiters.add(self)

    def on_close(self):
        print("CLOSED!", self.client_name)
        SocketHandler.waiters.remove(self)

    def check_origin(self, origin):
        # Override the origin check if needed
        return True

    async def send_updates(self, message):
        print('starting socket service loop')
        loop_counter = 0
        while True:
            try:
                await self.write_message({'status': 82317581})
            except tornado.websocket.WebSocketClosedError:
                self.on_close()
            except tornado.iostream.StreamClosedError:
                self.on_close()
            except Exception as e:
                self.on_close()
                print('Exception e:', self.client_name)
            await asyncio.sleep(0.05)

    async def on_message(self, message):
        print("RECEIVED :", message)
        self.client_name = message
        await self.send_updates(message)


def run_server():
    # Create tornado application and supply URL routes
    webApp = tornado.web.Application(
        [
            (
                r"/",
                SocketHandler,
                {},
            ),
        ]
    )

    application = tornado.httpserver.HTTPServer(webApp)
    webApp.listen(3433)
    # Start IO/Event loop
    tornado.ioloop.IOLoop.instance().start()


run_server()

The Stack-trace:

Traceback (most recent call last):
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/web.py", line 1699, in _execute
    result = await result
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 278, in get
    await self.ws_connection.accept_connection(self)
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 881, in accept_connection
    await self._accept_connection(handler)
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 964, in _accept_connection
    await self._receive_frame_loop()
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1118, in _receive_frame_loop
    await self._receive_frame()
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1209, in _receive_frame
    await handled_future
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 658, in <lambda>
    self.stream.io_loop.add_future(result, lambda f: f.result())
  File "ask_So.py", line 50, in on_message
    await self.send_updates(message)
  File "ask_So.py", line 39, in send_updates
    self.on_close()
  File "ask_So.py", line 26, in on_close
    SocketHandler.waiters.remove(self)
KeyError: <__main__.SocketHandler object at 0x7ffef9f25520>

I have tried moving the waiters set outside the class but it still produces the same behaviour.

To simulate WebSocketClosedError: open many browser tabs as clients and close one browser tab at a time.


Solution

  • It seems like self.on_close() is being called twice. Once you're calling it manually from inside send_updates() and then later, when a connection is actually closed, Tornado is also calling self.on_close(). Since the self object was already removed from the set the first time, it raises a KeyError the second time.

    If you want to close the connection, just call self.close(). The self.on_close() method will be called by Tornado automatically.

    Also, you can handle the exception in a try...except block inside on_close.


    Update

    The previous part of this answer should fix the KeyError related problem. This update is regarding why the clients are not being removed from waiters set.

    So, I tested your code and found a major problem with it here:

     async def on_message(self, message):
        print("RECEIVED :", message)
        self.client_name = message
        await self.send_updates(message) # <- This is problematic
    

    Whenever a client sends a message, it will run self.send_updates method. So even if there's only one client that sends a message, let's say, 10 times, send_updates will also be called 10 times and, as a result, you will have 10 while loops running simultaneously!

    As the number of loops increase, it ultimately blocks the server. That means Tornado has no time to run other code as it's busy juggling so many while loops. Hence, the clients from the waiters are never removed.

    Solution

    Instead of calling send_updates everytime a message arrives, you can call it just one time. Just have a single while loop to send updates to all clients.

    I'd update the code like this:

    class SocketHandler(...):
        # Make it a classmethod so that it can be 
        # called without an instance
        @classmethod
        async def send_updates(cls):
            print('starting socket service loop')
            loop_counter = 0
            while True:
                for waiter in cls.waiters:
                    # use `waiter` instead of `self`
                    try:
                        await waiter.write_message({'status': 82317581})
                        ...
    
                await asyncio.sleep(0.05)
    

    Instead of calling send_updates from on_message, you'll have to tel IOLoop to call it once:

    def run_server():
        ... 
    
        # schedule SocketHandler.send_updates to be run
        tornado.ioloop.IOLoop.current().add_callback(SocketHandler.send_updates)
        tornado.ioloop.IOLoop.current().start()
    

    This will have only one while loop running for all clients.