Longtime didn't use tornado. I would like to have a websocket which get's updates from a serial device of a host where tornado runs. So I tried multiprocessing with tornado but the process has no access to the tornado websocket. I tried to incorporate it as coroutine but that seems to not not spawn.
class WebApplication(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', IndexPageHandler),
(r"/config", ConfigHandler),
(r"/shutdown", ShutdownHandler),
(r'/websocket', WebSocketHandler),
(r'/(.*)', tornado.web.StaticFileHandler, {'path': resourcesWeb})
]
settings = {
'debug': debug,
'static_path': resourcesWeb,
'template_path': 'templates'
}
tornado.web.Application.__init__(self, handlers, **settings)
@gen.coroutine
def serial_reader(self):
log('serial_reader: start')
done = False
while not done:
sh.read()
serial_data_from = str(sh.data)
if len(serial_data_from) > 0:
if debug:
log('serial read:' + serial_data_from)
yield [con.write_message(serial_data_from) for con in WebSocketHandler.connections]
yield gen.sleep(0.3)
log('serial_reader: exit')
Python 3.8.5, Tornad 6.1
how would I properly and constantly update a websocket with data from outside the the tornado app
Since sh.read
is blocking, you'll need to run it in an executor. To then notify clients in the main thread, you'll need to use IOLoop.add_callback
(safe to call from any thread). This also means the reader method becomes a regular sync method.
Example:
from concurrent.futures import ThreadPoolExecutor
import functools
from tornado import web, websocket, ioloop
log = print
class IndexHandler(web.RequestHandler):
def get(self):
self.write("""<html>
<textarea cols="30" rows="10" id="output">%s</textarea><br />
<a href="/start" target="f" onclick="log(this.innerHTML)">start</a><br />
<a href="/stop" target="f" onclick="log(this.innerHTML)">stop</a><br />
<iframe name="f" width="100" height="30"></iframe>
<script>
ws = new WebSocket("ws://localhost:8888/stream");
out_el = document.getElementById("output");
function log(data) {out_el.value = data + "\\n" + out_el.value;}
ws.onmessage = function (ev) {log(ev.data);}
</script>""" % "\n".join(map(str, reversed(self.application.read_data))))
class StartHandler(web.RequestHandler):
def get(self):
self.application.start_reader()
self.write("Started")
class StopHandler(web.RequestHandler):
def get(self):
self.application.stop_reader()
self.write("Stopped")
class WebSocketHandler(websocket.WebSocketHandler):
connections = set()
def open(self):
WebSocketHandler.connections.add(self)
def on_close(self):
if self in WebSocketHandler.connections:
WebSocketHandler.connections.remove(self)
class WebApplication(web.Application):
def __init__(self, autostart=False):
handlers = [
(r"/", IndexHandler),
(r"/start", StartHandler),
(r"/stop", StopHandler),
(r'/stream', WebSocketHandler),
]
web.Application.__init__(self, handlers)
self._reader_executor = ThreadPoolExecutor(1)
self._keep_reading = None
self.read_data = []
if autostart:
self.start_reader()
def start_reader(self):
if not self._keep_reading:
self._keep_reading = True
loop = ioloop.IOLoop.current()
self._reader_future = loop.run_in_executor(self._reader_executor, functools.partial(self.reader, loop))
def stop_reader(self):
if self._keep_reading:
self._keep_reading = False
self._reader_future.cancel()
def notify_clients(self, data=None):
for con in WebSocketHandler.connections:
try:
con.write_message("{}".format(data))
except Exception as ex:
log("error sending to {}".format(con))
def reader(self, main_loop):
import random
import time
while self._keep_reading:
time.sleep(1 + random.random()) # simulate read - block for some time
data = random.getrandbits(32)
print("reader: data={}".format(data))
if data:
main_loop.add_callback(self.notify_clients, data)
self.read_data.append(data)
time.sleep(0.1)
if __name__ == "__main__":
app = WebApplication(True)
app.listen(8888)
loop = ioloop.IOLoop.current()
try:
loop.start()
except KeyboardInterrupt as ex:
app.stop_reader()
for con in WebSocketHandler.connections:
con.close()
loop.stop()