I have a Python Tornado Websocket server that stores clients in a shared set() so that I know how many clients are connected.
The challenge is that calling on_close
after WebSocketClosedError
raises a KeyError
and the client-instance is not removed from the set of connected clients. This error has caused my server to accumulate over 1000 clients even when the active clients are only around 5.
My Code:
import tornado.iostream
import tornado.websocket
import asyncio
class SocketHandler(tornado.websocket.WebSocketHandler):
socket_active_message = {"status": "Socket Connection Active"}
waiters = set()
def initialize(self):
self.client_name = "newly_connected"
def open(self):
print('connection opened')
# https://kite.com/python/docs/tornado.websocket.WebSocketHandler.set_nodelay
self.set_nodelay(True)
SocketHandler.waiters.add(self)
def on_close(self):
print("CLOSED!", self.client_name)
SocketHandler.waiters.remove(self)
def check_origin(self, origin):
# Override the origin check if needed
return True
async def send_updates(self, message):
print('starting socket service loop')
loop_counter = 0
while True:
try:
await self.write_message({'status': 82317581})
except tornado.websocket.WebSocketClosedError:
self.on_close()
except tornado.iostream.StreamClosedError:
self.on_close()
except Exception as e:
self.on_close()
print('Exception e:', self.client_name)
await asyncio.sleep(0.05)
async def on_message(self, message):
print("RECEIVED :", message)
self.client_name = message
await self.send_updates(message)
def run_server():
# Create tornado application and supply URL routes
webApp = tornado.web.Application(
[
(
r"/",
SocketHandler,
{},
),
]
)
application = tornado.httpserver.HTTPServer(webApp)
webApp.listen(3433)
# Start IO/Event loop
tornado.ioloop.IOLoop.instance().start()
run_server()
The Stack-trace:
Traceback (most recent call last):
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/web.py", line 1699, in _execute
result = await result
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 278, in get
await self.ws_connection.accept_connection(self)
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 881, in accept_connection
await self._accept_connection(handler)
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 964, in _accept_connection
await self._receive_frame_loop()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1118, in _receive_frame_loop
await self._receive_frame()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 1209, in _receive_frame
await handled_future
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/mnt/c/Users/EE/projects/new/venv/lib/python3.8/site-packages/tornado/websocket.py", line 658, in <lambda>
self.stream.io_loop.add_future(result, lambda f: f.result())
File "ask_So.py", line 50, in on_message
await self.send_updates(message)
File "ask_So.py", line 39, in send_updates
self.on_close()
File "ask_So.py", line 26, in on_close
SocketHandler.waiters.remove(self)
KeyError: <__main__.SocketHandler object at 0x7ffef9f25520>
I have tried moving the waiters set outside the class but it still produces the same behaviour.
To simulate WebSocketClosedError
: open many browser tabs as clients and close one browser tab at a time.
It seems like self.on_close()
is being called twice. Once you're calling it manually from inside send_updates()
and then later, when a connection is actually closed, Tornado is also calling self.on_close()
. Since the self
object was already removed from the set the first time, it raises a KeyError
the second time.
If you want to close the connection, just call self.close()
. The self.on_close()
method will be called by Tornado automatically.
Also, you can handle the exception in a try...except
block inside on_close
.
The previous part of this answer should fix the KeyError
related problem. This update is regarding why the clients are not being removed from waiters
set.
So, I tested your code and found a major problem with it here:
async def on_message(self, message):
print("RECEIVED :", message)
self.client_name = message
await self.send_updates(message) # <- This is problematic
Whenever a client sends a message, it will run self.send_updates
method. So even if there's only one client that sends a message, let's say, 10 times, send_updates
will also be called 10 times and, as a result, you will have 10 while
loops running simultaneously!
As the number of loops increase, it ultimately blocks the server. That means Tornado has no time to run other code as it's busy juggling so many while
loops. Hence, the clients from the waiters
are never removed.
Solution
Instead of calling send_updates
everytime a message arrives, you can call it just one time. Just have a single while
loop to send updates to all clients.
I'd update the code like this:
class SocketHandler(...):
# Make it a classmethod so that it can be
# called without an instance
@classmethod
async def send_updates(cls):
print('starting socket service loop')
loop_counter = 0
while True:
for waiter in cls.waiters:
# use `waiter` instead of `self`
try:
await waiter.write_message({'status': 82317581})
...
await asyncio.sleep(0.05)
Instead of calling send_updates
from on_message
, you'll have to tel IOLoop to call it once:
def run_server():
...
# schedule SocketHandler.send_updates to be run
tornado.ioloop.IOLoop.current().add_callback(SocketHandler.send_updates)
tornado.ioloop.IOLoop.current().start()
This will have only one while
loop running for all clients.