Search code examples
python-3.xtornado

Tornado asynchronous coroutine


Longtime didn't use tornado. I would like to have a websocket which get's updates from a serial device of a host where tornado runs. So I tried multiprocessing with tornado but the process has no access to the tornado websocket. I tried to incorporate it as coroutine but that seems to not not spawn.

class WebApplication(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r'/', IndexPageHandler),
            (r"/config", ConfigHandler),
            (r"/shutdown", ShutdownHandler),
            (r'/websocket', WebSocketHandler),
            (r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
        ]

        settings = {
            'debug': debug,
            'static_path': resourcesWeb,
            'template_path': 'templates'
        }
        tornado.web.Application.__init__(self, handlers, **settings)

    @gen.coroutine
    def serial_reader(self):
        log('serial_reader: start')
        done = False
        while not done:
            sh.read()
            serial_data_from = str(sh.data)
            if len(serial_data_from) > 0:
                if debug:
                    log('serial read:' + serial_data_from)
                    yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
            yield gen.sleep(0.3)
        log('serial_reader: exit')

Python 3.8.5, Tornad 6.1

how would I properly and constantly update a websocket with data from outside the the tornado app


Solution

  • Since sh.read is blocking, you'll need to run it in an executor. To then notify clients in the main thread, you'll need to use IOLoop.add_callback (safe to call from any thread). This also means the reader method becomes a regular sync method.

    Example:

    from concurrent.futures import ThreadPoolExecutor
    import functools
    
    from tornado import web, websocket, ioloop
    
    log = print
    
    
    class IndexHandler(web.RequestHandler):
        def get(self):
            self.write("""<html>
                <textarea cols="30" rows="10" id="output">%s</textarea><br />
                <a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
                <a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
                <iframe name="f" width="100" height="30"></iframe>
                <script>
                    ws = new WebSocket("ws://localhost:8888/stream");
                    out_el = document.getElementById("output");
                    function log(data) {out_el.value = data + "\\n" + out_el.value;}
                    ws.onmessage = function (ev) {log(ev.data);}
                </script>""" % "\n".join(map(str, reversed(self.application.read_data))))
    
    
    class StartHandler(web.RequestHandler):
        def get(self):
            self.application.start_reader()
            self.write("Started")
    
    
    class StopHandler(web.RequestHandler):
        def get(self):
            self.application.stop_reader()
            self.write("Stopped")
    
    
    class WebSocketHandler(websocket.WebSocketHandler):
        connections = set()
    
        def open(self):
            WebSocketHandler.connections.add(self)
    
        def on_close(self):
            if self in WebSocketHandler.connections:
                WebSocketHandler.connections.remove(self)
    
    
    class WebApplication(web.Application):
        def __init__(self, autostart=False):
            handlers = [
                (r"/", IndexHandler),
                (r"/start", StartHandler),
                (r"/stop", StopHandler),
                (r'/stream', WebSocketHandler),
            ]
            web.Application.__init__(self, handlers)
            self._reader_executor = ThreadPoolExecutor(1)
            self._keep_reading = None
            self.read_data = []
            if autostart:
                self.start_reader()
        
        def start_reader(self):
            if not self._keep_reading:
                self._keep_reading = True
                loop = ioloop.IOLoop.current()
                self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
        
        def stop_reader(self):
            if self._keep_reading:
                self._keep_reading = False
                self._reader_future.cancel()
        
        def notify_clients(self, data=None):
            for con in WebSocketHandler.connections:
                try:
                    con.write_message("{}".format(data))
                except Exception as ex:
                    log("error sending to {}".format(con))
        
        def reader(self, main_loop):
            import random
            import time
            while self._keep_reading:
                time.sleep(1 + random.random())  # simulate read - block for some time
                data = random.getrandbits(32)
                print("reader: data={}".format(data))
                if data:
                    main_loop.add_callback(self.notify_clients, data)
                    self.read_data.append(data)
                time.sleep(0.1)
    
    
    if __name__ == "__main__":
        app = WebApplication(True)
        app.listen(8888)
        loop = ioloop.IOLoop.current()
        try:
            loop.start()
        except KeyboardInterrupt as ex:
            app.stop_reader()
            for con in WebSocketHandler.connections:
                con.close()
            loop.stop()