Search code examples
jsonpython-2.7websocketwebservertornado

Working with coroutines in Python Tornado Web Server


I am working on an autonomous car implementation for a web browser game with Python 2x. I use Tornado Web Server to run game on localhost and I post and receive data from game with JSON data format in the function called "FrameHandler" and also I determine what the act of car should be in "to_dict_faster()" function.

Here, my problem is that I can write data to text file which is hold in speed_data variable in specific time interval with help of a coroutine. However, I can't dump JSON data to function in this specific time interval because "FrameHandler" acts like While True and it always requests data to dump. What I am trying to do is sending desired acts as writing text file in specific time interval while not changing flow frame handler because it affects FPS of the game.

I am trying to figure out How can I do that for a long time any help would be great here:

@gen.coroutine
def sampler():
io_loop = tornado.ioloop.IOLoop.current()
start = time.time()
while True:
      with open("Sampled_Speed.txt", "a") as text_file:
          text_file.write("%d,%.2f\n" % (speed_data, ((time.time() - start))))
    yield gen.Task(io_loop.add_timeout, io_loop.time() + period)

class MainHandler(tornado.web.RequestHandler):
def get(self):
     self.redirect("/static/v2.curves.html")

class FrameHandler(tornado.web.RequestHandler):

    def post(self):
        global speed_data
        data = json.loads(self.get_arguments("telemetry")[0])
        ar = np.fromstring(base64.decodestring(self.request.body), dtype=np.uint8)
        image = ar.reshape(hp.INPUT_SIZE, hp.INPUT_SIZE, hp.NUM_CHANNELS)
        left, right, faster, slower = data["action"]
        terminal, action, all_data, was_start = (
            data["terminal"],
            Action(left=left, right=right, faster=faster, slower=slower),
            data["all_data"],
            data["was_start"]
        )

        for i in range(len(all_data)):
            data_dict=all_data[i]
            speed_data = data_dict[u'speed']
            position_data=data_dict[u'position']

        result_action = agent.steps(image, 0.1, terminal, was_start, action, all_data)

        if speed_data < 4000:
            self.write(json.dumps(result_action.to_dict_faster()))
        else:
            self.write(json.dumps(result_action.to_dict_constant()))

def make_app():
return tornado.web.Application([
    (r"/", MainHandler),
    (r"/frame", FrameHandler),
    (r"/static/(.*)", tornado.web.StaticFileHandler, {"path": static_path})
], debug=True)

if __name__ == "__main__":
app = make_app()
if "SERVER_PORT" in os.environ:
    port = int(os.environ["SERVER_PORT"])
else:
    port = 8880
print "LISTENING ON PORT: %d" % port
app.listen(port)
tornado.ioloop.IOLoop.current().run_sync(sampler)
tornado.ioloop.IOLoop.current().start()

Solution

  • You can move file writing to a different thread (using tornado's run_on_executor for example), so python interpreter will automatically switch from Sampler to main thread with FrameHandler on write. But you have to use thread-safe speed_data variable, I've used stdlib Queue.Queue as an example:

    class Handler(tornado.web.RequestHandler):
        @gen.coroutine
        def get(self):
            global speed_data
            speed_data.put("REALLY BIG TEST DATA\n")
            self.finish("OK")
    
    
    class Sampler():
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
    
        def __init__(self, queue):
            self._q = queue
    
        @run_on_executor
        def write_sample(self):
            with open("foobar.txt", "w") as f:
                while True:
                    data = self._q.get()
                    f.write(data)
    
    
    if __name__ == '__main__':
        application = Application(
            [("/status", Handler)]
        )
    
        server = HTTPServer(application)
        server.listen(8888)
    
        speed_data = Queue.Queue()
        smp = Sampler(speed_data)
        IOLoop.current().add_callback(smp.write_sample)
        IOLoop.current().start()