I want to implement a service based on web sockets in the Tornado framework. When a user closes a web socket, I want to notify the other users about this. However, on_close
is apparently a blocking function and my _broadcast(str) -> None
function is async.
How can I call this function anyway?
from tornado import websocket
logger = logging.getLogger(__name__)
class SocketHandler(websocket.WebSocketHandler):
async def open(self, *args, conns, **kwargs):
logger.info(f"Opened a new connection to client {id(self)}")
self._conns = conns
async def on_message(self, message):
logger.info(f"Client {id(self)} sent message: {message}")
await self._broadcast(message)
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._broadcast("something") # TODO
async def _broadcast(self, msg):
for conn in self._conns:
await conn.write_message(msg)
except websocket.WebSocketClosedError:
app = web.Application([
(r'/ws', SocketHandler)
if __name__ == '__main__':
The simple solution you're looking for is to use asyncio.create_task
when calling the coroutine:
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
(the legacy Tornado version of this function is tornado.gen.convert_yielded
, but now that Tornado and asyncio are integrated there's no reason not to use the asyncio version for native coroutines)
But for this particular problem, the use of await
in your _broadcast
function is not ideal. Awaiting a write_message
is used to provide flow control, but create_task
doesn't do anything useful with the backpressure provided by await
. (write_message
is fairly unusual in that it is fully supported to call it both with and without await
). In fact, it applies backpressure to the wrong things - one slow connection will slow notifications to all the others that come after it.
So in this case I'd advise making _broadcast
a regular synchronous function:
def _broadcast(self, msg):
for conn in self._conns:
except websocket.WebSocketClosedError:
If you want to be better able to control memory usage (via the flow control provided by await write_message
), you'll need a more complicated solution, probably involving a bounded queue for each connection (in on_close
, use put_nowait
to add the message to every connection's queue, then have a task that reads from the queue and writes the message with await write_message