Search code examples
pythontornado

push engine in tornado


I am trying to code a simple push engine in tornado. Basically, I have a program running on my server, continuously producing an output that I process by Python to update a dictionary, and I want that dictionary published to web client, for example every minute.

I would be thankful if your answer contains links to documentation, or rephrasing of my own question. I am reading with a lot of pain tornado documentation, so any help would be appreciated.

Here is a skeleton of the code with comments inside explaining what I want to do:

import subprocess
import sys
import pprint

import tornado.ioloop
import tornado.web

# this is to run my bash process and continuously yiled its output
def runProcess(cmd):
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        retcode = p.poll()
        line = p.stdout.readline()
        yield line
     if retcode is not None:
         break


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        #What can I do here if I want to send the update data every minute?
        self.write(data)


    def get_data(self):
        data = dict()
        cmd =  'myProg --args' 
        # this program will produce a continuous stream of data
        for line in runProcess(cmd.split()):
            data[line.split()[0] = line.plit()[1]
        #now dictionary is updated? yield result?
        # even if I want to publish updates every minute?
        yield all_data


def make_app():
    return tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Solution

  • As @BorrajaX's answer suggested, use WebSockets. There are many examples online for how to connect to websockets with JavaScript, so you can figure that part out on your own.

    One major issue with your code is that your method for reading lines of text from your process's stdout is blocking, which breaks the way Tornado works. Your code has to hook into the event loop and defer all of the waiting to Tornado itself. Fortunately, Tornado already has a process module that wraps subprocess.Popen so that it supports the IOStream interface:

    @coroutine
    def get_data():
        process = Subprocess(['tail', '-f', '/var/log/system.log'], stdout=Subprocess.STREAM)
    
        while True:
            line = yield process.stdout.read_until('\n')
    
            # Do something with `line`
    

    One other thing to note is that in order to broadcast messages to all the connected websocket clients, you first need to actually keep track of them. This can be done easily by implementing the open and on_close methods:

    class BroadcastHandler(WebSocketHandler):
        clients = []
    
        def open(self):
            self.clients.append(self)
    
        def on_close(self):
            self.clients.remove(self)
    
        @classmethod
        def broadcast(cls, message):
            for client in cls.clients:
                client.write_message(message)
    

    Once you have that, the # Do something with line part becomes quite simple:

    BroadcastHandler.broadcast(line)
    

    Here's something to start off with:

    from tornado.gen import coroutine
    from tornado.web import Application
    from tornado.websocket import WebSocketHandler
    from tornado.ioloop import IOLoop
    from tornado.process import Subprocess
    
    
    @coroutine
    def get_data():
        process = Subprocess(['tail', '-f', '/var/log/system.log'], stdout=Subprocess.STREAM)
    
        while True:
            line = yield process.stdout.read_until('\n')
            BroadcastHandler.broadcast(line)
    
    
    class BroadcastHandler(WebSocketHandler):
        clients = []
    
        def open(self):
            self.clients.append(self)
    
        def on_close(self):
            self.clients.remove(self)
    
        @classmethod
        def broadcast(cls, message):
            for client in cls.clients:
                client.write_message(message)
    
    
    def create_application():
        # "Run" the future whenever we start the IOLoop and ignore the result
        IOLoop.instance().add_future(get_data(), lambda _: None)
    
        app = Application([
            (r'/broadcast', BroadcastHandler),
        ])
    
        return app
    
    
    if __name__ == '__main__':
        app = create_application()
        app.listen(8888)
    
        IOLoop.current().start()
    

    This method for keeping track of clients is inefficient, but it is good enough for a few hundred clients connecting for long periods of time.