Search code examples
websocketsanic

Sanic Webserver: Websocket handler closes socket on return; looping breaks other request handlers


Scenario: I have a sanic webserver serving a simple website. The website is basically a large data table in html with vue template support. Since the table entries change every few minutes, the data is delivered via websocket on change. Around 2000 users at the same time. I have tried to implement a pub/sub architecture.

Problem: My websockets are closed as soon as my sanic handler returns. I could have a loop inside to keep the handler open. But keeping 2000 handlers open sounds like a bad idea... Also the open handlers behave strange. One thread or a small threadpool should do the job. Maybe I got the sanic documentation wrong and need design advice.

Things I've tried: - increasing the timeout setting to be high enough - trying various other websocket settings in sanic - let my client side js return false onmessage (Javascript websockets closing immediately after opening) - set the ws reference to null after passing it

Sanic Webserver's Index:

@app.route('/')
async def serve_index(request):
    return await file(os.path.join(os.path.dirname(__file__), 'index.html'))

Index.html's JS:

var app = new Vue({
    el: '#app',
        data() {
            manydata0: 0,
            manydata1: 0,
            ws: null,
        }
    },
    methods: {
        update: function (json_data) {
            json = JSON.parse(json_data);
            this.manydata0 = json['data0'];
            this.manydata1 = json['data1'];
        }
    },
    created: function () {
        this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
        messages = document.createElement('ul');
        this.ws.onmessage = function (event) {
            console.log("new data")
            app.update(event.data);
        return false;
    };
    document.body.appendChild(messages);
    this.ws.onclose = function (event) {
        console.log("closed :(")
    };

Sanic Webserver's Websocket Handler (1st Version, Sockets die immediately):

@app.websocket('/reload')
async def feed(request, ws):
    #time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
    await ws.send(Path(json).read_text()) # serve initial data
    connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates

Sanic Webservers's Websocket Handler (2nd Version, Handler blocks other req handlers)

@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
        except Exception as e:
            print("Exception while checking file: ", e)
    # this stops the server to handle other @app.routes like css, fonts, favicon

Sanic Webservers's Websocket Handler (3nd Version, unnecessary recv())

@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
                await recv() # if the client sends from time to time all is fine
        except Exception as e:
            print("Exception while checking file: ", e)

The last two code snippets don't differ much. I add a ws.recv() and send some fitting stuff from client side (in an interval for example), then everything works. Then css, fonts and favicon are sent. But that cannot be intended, can it? This should not scale well, right?

All in all that does not make much sense to me. What am I missunderstanding?


Solution

  • one of the Sanic core-devs here.

    First, for an example of a pubsub type architecture, here is a gist I prepared. I think it might help.

    My basic idea is to create a single Feed object that loops in its own task looking for an event. In my case it is receipt of information from pubsub. In your case, it should be checking the time on the JSON document.

    Then, when that Feed.receiver has an event triggered, it then pings out to all of the clients that are listening.

    Inside the websocket handler itself, you want to keep that open. If you do not, then the connection will close. If you do not care about receiving information from the client, you do not need to use await recv().


    So, in your case, using SUPER simple logic, I would do something like the following.

    This is untested code, may need some tweak

    import os
    import random
    import string
    from functools import partial
    from pathlib import Path
    
    from sanic import Sanic
    
    import asyncio
    import websockets
    from dataclasses import dataclass, field
    from typing import Optional, Set
    
    app = Sanic(__name__)
    
    FILE = "/tmp/foobar"
    TIMEOUT = 10
    INTERVAL = 20
    
    
    def generate_code(length=12, include_punctuation=False):
        characters = string.ascii_letters + string.digits
        if include_punctuation:
            characters += string.punctuation
        return "".join(random.choice(characters) for x in range(length))
    
    
    @dataclass
    class Client:
        interface: websockets.server.WebSocketServerProtocol = field(repr=False)
        sid: str = field(default_factory=partial(generate_code, 36))
    
        def __hash__(self):
            return hash(str(self))
    
        async def keep_alive(self) -> None:
            while True:
                try:
                    try:
                        pong_waiter = await self.interface.ping()
                        await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
                    except asyncio.TimeoutError:
                        print("NO PONG!!")
                        await self.feed.unregister(self)
                    else:
                        print(f"ping: {self.sid} on <{self.feed.name}>")
                    await asyncio.sleep(INTERVAL)
                except websockets.exceptions.ConnectionClosed:
                    print(f"broken connection: {self.sid} on <{self.feed.name}>")
                    await self.feed.unregister(self)
                    break
    
        async def shutdown(self) -> None:
            self.interface.close()
    
        async def run(self) -> None:
            try:
                self.feed.app.add_task(self.keep_alive())
                while True:
                    pass
            except websockets.exceptions.ConnectionClosed:
                print("connection closed")
            finally:
                await self.feed.unregister(self)
    
    
    class Feed:
        app: Sanic
        clients: Set[Client]
        cached = None
    
        def __init__(self, app: Sanic):
            self.clients = set()
            self.app = app
    
        @classmethod
        async def get(cls, app: Sanic):
            is_existing = False
    
            if cls.cached:
                is_existing = True
                feed = cls.cached
            else:
                feed = cls(app)
                cls.cached = feed
    
            if not is_existing:
                feed.app.add_task(feed.receiver())
    
            return feed, is_existing
    
        async def receiver(self) -> None:
            print("Feed receiver started")
            mod_time = 0
            while True:
                try:
                    stat = os.stat(FILE)
                    print(f"times: {mod_time} | {stat.st_mtime}")
                    if mod_time != stat.st_mtime:
                        content = self.get_file_contents()
                        for client in self.clients:
                            try:
                                print(f"\tSending to {client.sid}")
                                await client.interface.send(content)
                            except websockets.exceptions.ConnectionClosed:
                                print(f"ConnectionClosed. Client {client.sid}")
                except Exception as e:
                    print("Exception while checking file: ", e)
    
        async def register(
            self, websocket: websockets.server.WebSocketServerProtocol
        ) -> Optional[Client]:
            client = Client(interface=websocket)
            print(f">>> register {client}")
    
            client.feed = self
            self.clients.add(client)
    
            # Send initial content
            content = self.get_file_contents()
            client.interface.send(content)
    
            print(f"\nAll clients\n{self.clients}\n\n")
    
            return client
    
        async def unregister(self, client: Client) -> None:
            print(f">>> unregister {client} on <{self.name}>")
            if client in self.clients:
                await client.shutdown()
                self.clients.remove(client)
                print(f"\nAll remaining clients\n{self.clients}\n\n")
    
        def get_file_contents(self):
            return Path(FILE).read_text()
    
    
    @app.websocket("/reload")
    async def feed(request, ws):
        feed, is_existing = await Feed.get(app)
    
        client = await feed.register(ws)
        await client.run()
    
    
    if __name__ == "__main__":
        app.run(debug=True, port=7777)