Scenario: I have a sanic webserver serving a simple website. The website is basically a large data table in html with vue template support. Since the table entries change every few minutes, the data is delivered via websocket on change. Around 2000 users at the same time. I have tried to implement a pub/sub architecture.
Problem: My websockets are closed as soon as my sanic handler returns. I could have a loop inside to keep the handler open. But keeping 2000 handlers open sounds like a bad idea... Also the open handlers behave strange. One thread or a small threadpool should do the job. Maybe I got the sanic documentation wrong and need design advice.
Things I've tried: - increasing the timeout setting to be high enough - trying various other websocket settings in sanic - let my client side js return false onmessage (Javascript websockets closing immediately after opening) - set the ws reference to null after passing it
Sanic Webserver's Index:
@app.route('/')
async def serve_index(request):
return await file(os.path.join(os.path.dirname(__file__), 'index.html'))
Index.html's JS:
var app = new Vue({
el: '#app',
data() {
manydata0: 0,
manydata1: 0,
ws: null,
}
},
methods: {
update: function (json_data) {
json = JSON.parse(json_data);
this.manydata0 = json['data0'];
this.manydata1 = json['data1'];
}
},
created: function () {
this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
messages = document.createElement('ul');
this.ws.onmessage = function (event) {
console.log("new data")
app.update(event.data);
return false;
};
document.body.appendChild(messages);
this.ws.onclose = function (event) {
console.log("closed :(")
};
Sanic Webserver's Websocket Handler (1st Version, Sockets die immediately):
@app.websocket('/reload')
async def feed(request, ws):
#time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
await ws.send(Path(json).read_text()) # serve initial data
connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates
Sanic Webservers's Websocket Handler (2nd Version, Handler blocks other req handlers)
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
except Exception as e:
print("Exception while checking file: ", e)
# this stops the server to handle other @app.routes like css, fonts, favicon
Sanic Webservers's Websocket Handler (3nd Version, unnecessary recv())
@app.websocket('/reload')
async def feed(request, ws):
mod_time = 0
while True:
try:
stat = os.stat(json)
if mod_time != stat.st_mtime:
await ws.send(Path(json).read_text())
await recv() # if the client sends from time to time all is fine
except Exception as e:
print("Exception while checking file: ", e)
The last two code snippets don't differ much. I add a ws.recv() and send some fitting stuff from client side (in an interval for example), then everything works. Then css, fonts and favicon are sent. But that cannot be intended, can it? This should not scale well, right?
All in all that does not make much sense to me. What am I missunderstanding?
one of the Sanic core-devs here.
First, for an example of a pubsub type architecture, here is a gist I prepared. I think it might help.
My basic idea is to create a single Feed
object that loops in its own task looking for an event. In my case it is receipt of information from pubsub. In your case, it should be checking the time on the JSON document.
Then, when that Feed.receiver
has an event triggered, it then pings out to all of the clients that are listening.
Inside the websocket
handler itself, you want to keep that open. If you do not, then the connection will close. If you do not care about receiving information from the client, you do not need to use await recv()
.
So, in your case, using SUPER simple logic, I would do something like the following.
This is untested code, may need some tweak
import os
import random
import string
from functools import partial
from pathlib import Path
from sanic import Sanic
import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set
app = Sanic(__name__)
FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20
def generate_code(length=12, include_punctuation=False):
characters = string.ascii_letters + string.digits
if include_punctuation:
characters += string.punctuation
return "".join(random.choice(characters) for x in range(length))
@dataclass
class Client:
interface: websockets.server.WebSocketServerProtocol = field(repr=False)
sid: str = field(default_factory=partial(generate_code, 36))
def __hash__(self):
return hash(str(self))
async def keep_alive(self) -> None:
while True:
try:
try:
pong_waiter = await self.interface.ping()
await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
except asyncio.TimeoutError:
print("NO PONG!!")
await self.feed.unregister(self)
else:
print(f"ping: {self.sid} on <{self.feed.name}>")
await asyncio.sleep(INTERVAL)
except websockets.exceptions.ConnectionClosed:
print(f"broken connection: {self.sid} on <{self.feed.name}>")
await self.feed.unregister(self)
break
async def shutdown(self) -> None:
self.interface.close()
async def run(self) -> None:
try:
self.feed.app.add_task(self.keep_alive())
while True:
pass
except websockets.exceptions.ConnectionClosed:
print("connection closed")
finally:
await self.feed.unregister(self)
class Feed:
app: Sanic
clients: Set[Client]
cached = None
def __init__(self, app: Sanic):
self.clients = set()
self.app = app
@classmethod
async def get(cls, app: Sanic):
is_existing = False
if cls.cached:
is_existing = True
feed = cls.cached
else:
feed = cls(app)
cls.cached = feed
if not is_existing:
feed.app.add_task(feed.receiver())
return feed, is_existing
async def receiver(self) -> None:
print("Feed receiver started")
mod_time = 0
while True:
try:
stat = os.stat(FILE)
print(f"times: {mod_time} | {stat.st_mtime}")
if mod_time != stat.st_mtime:
content = self.get_file_contents()
for client in self.clients:
try:
print(f"\tSending to {client.sid}")
await client.interface.send(content)
except websockets.exceptions.ConnectionClosed:
print(f"ConnectionClosed. Client {client.sid}")
except Exception as e:
print("Exception while checking file: ", e)
async def register(
self, websocket: websockets.server.WebSocketServerProtocol
) -> Optional[Client]:
client = Client(interface=websocket)
print(f">>> register {client}")
client.feed = self
self.clients.add(client)
# Send initial content
content = self.get_file_contents()
client.interface.send(content)
print(f"\nAll clients\n{self.clients}\n\n")
return client
async def unregister(self, client: Client) -> None:
print(f">>> unregister {client} on <{self.name}>")
if client in self.clients:
await client.shutdown()
self.clients.remove(client)
print(f"\nAll remaining clients\n{self.clients}\n\n")
def get_file_contents(self):
return Path(FILE).read_text()
@app.websocket("/reload")
async def feed(request, ws):
feed, is_existing = await Feed.get(app)
client = await feed.register(ws)
await client.run()
if __name__ == "__main__":
app.run(debug=True, port=7777)