Search code examples
pythontornado

Python Tornado - How to Implement Long-Polling Server to Read from a Queue


I'm trying to build a web server to collect "commands" via AJAX and then distribute the commands to clients via long-polling.

The goal is that someone POSTs some data to /add-command.

Another client implements a long-polling client hitting /poll waiting for a command to execute.

I think a queue is the right data structure to use to hold commands waiting for attention. I'd like the commands to essentially be distributed immediately to any long-polling client but held if no client is currently polling.

Here's my python script.

import os
import time
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import Queue
import multiprocessing.pool
import mysql.connector
import urlparse
import uuid
import json

_commandQueue = Queue.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.htm")

class AddCommandHandler(tornado.web.RequestHandler):
    def post(self):
        d = urlparse.parse_qs(self.request.body)
        _commandQueue.put(d)
        self.write(str(True))

class PollHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        self.write("start")
        d = 1
        d = yield self.getCommand()
        self.write(str(d))
        self.write("end")
        self.finish()
    @tornado.gen.coroutine
    def getCommand(self):
        start = time.time()
        while (time.time() - start) < _commandPollTimeout * 1000:
            if not _commandQueue.empty:
                return _commandQueue.get()
            else:
                time.sleep(_commandPollInterval)
        return None 

def main():
    application = tornado.web.Application(
        [
            (r"/", HomeHandler),
            (r"/add-command", AddCommandHandler),
            (r"/poll", PollHandler),
        ], 
        debug=True, 
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
    )
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

The AddCommandHandler works fine to put items in the _commandQueue.

The PollHandler request just times out. If I call the PollHandler, it seems to lock the _commandQueue and I can't put or get from it.

I suspect I need to join the queue, but I can't seem to find the right time to do that in the code.

UPDATE -- Here's my final code thanks to the answers

import os
import time
import datetime
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.queues
import urlparse
import json

_commandQueue = tornado.queues.Queue()
_commandPollInterval = 0.2
_commandPollTimeout = 10

class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.htm")

class AddCommandHandler(tornado.web.RequestHandler):
    def get(self):
        cmd = urlparse.parse_qs(self.request.body)
        _commandQueue.put(cmd)
        self.write(str(cmd))
    def post(self):
        cmd = urlparse.parse_qs(self.request.body)
        _commandQueue.put(cmd)
        self.write(str(cmd))

class PollHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        cmd = yield self.getCommand()
        self.write(str(cmd))
    @tornado.gen.coroutine
    def getCommand(self):
        try:
            cmd = yield _commandQueue.get(
                timeout=datetime.timedelta(seconds=_commandPollTimeout)
            )
            raise tornado.gen.Return(cmd)
        except tornado.gen.TimeoutError:
            raise tornado.gen.Return()

def main():
    application = tornado.web.Application(
        [
            (r"/", HomeHandler),
            (r"/add-command", AddCommandHandler),
            (r"/poll", PollHandler),
        ], 
        debug=True, 
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
    )
    tornado.httpserver.HTTPServer(application).listen(int(os.environ.get("PORT", 5000)))
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

Solution

  • In async model you should omit blocking operation, time.sleep is evil in your code. Moreover, I think that the best way is to use tornado's (in async interface) queue - tornado.queue.Queue and use async get:

    import datetime
    import tornado.gen
    import tornado.queues
    
    _commandQueue = tornado.queues.Queue()
    
    
        # ...rest of the code ...
    
        @tornado.gen.coroutine
        def getCommand(self):
            try:
                # wait for queue item if cannot obtain in timeout raise exception
                cmd = yield _commandQueue.get(
                    timeout=datetime.timedelta(seconds=_commandPollTimeout)
                )
                return cmd
            except tornado.gen.Timeout:
                return None
    

    Note: Module tornado.queues si available since Tornado 4.x, if you use older one, Toro will help.