Search code examples
pythontornado

Creating a processing queue in Tornado


I'm using a Tornado web server to queue up items that need to be processed outside of the request/response cycle.

In my simplified example below, every time a request comes in, I add a new string to a list called queued_items. I want to create something that will watch that list and process the items as they show up in it.

(In my real code, the items are processed and sent over a TCP socket which may or may not be connected when the web request arrives. I want the web server to keep queuing up items regardless of the socket connection)

I'm trying to keep this code simple and not use external queues/programs like Redis or Beanstalk. It's not going to have very high volume.

What's a good way using Tornado idioms to watch the client.queued_items list for new items and process them as they arrive?

import time

import tornado.ioloop
import tornado.gen
import tornado.web

class Client():

    def __init__(self):
        self.queued_items = []

    @tornado.gen.coroutine
    def watch_queue(self):
        # I have no idea what I'm doing
        items = yield client.queued_items
        # go_do_some_thing_with_items(items)

class IndexHandler(tornado.web.RequestHandler):

    def get(self):
        client.queued_items.append("%f" % time.time())
        self.write("Queued a new item")

if __name__ == "__main__":

    client = Client()

    # Watch the queue for when new items show up
    client.watch_queue()

    # Create the web server 
    application = tornado.web.Application([
        (r'/', IndexHandler),
    ], debug=True)

    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Solution

  • There is a library called toro, which provides synchronization primitives for tornado. [Update: As of tornado 4.2, toro has been merged into tornado.]

    Sounds like you could just use a toro.Queue (or tornado.queues.Queue in tornado 4.2+) to handle this:

    import time
    
    import toro
    import tornado.ioloop
    import tornado.gen
    import tornado.web
    
    class Client():
    
        def __init__(self):
            self.queued_items = toro.Queue()
    
        @tornado.gen.coroutine
        def watch_queue(self):
            while True:
                items = yield self.queued_items.get()
                # go_do_something_with_items(items)
    
    class IndexHandler(tornado.web.RequestHandler):
    
        @tornado.gen.coroutine
        def get(self):
            yield client.queued_items.put("%f" % time.time())
            self.write("Queued a new item")
    
    if __name__ == "__main__":
    
        client = Client()
    
        # Watch the queue for when new items show up
        tornado.ioloop.IOLoop.current().add_callback(client.watch_queue)
    
        # Create the web server 
        application = tornado.web.Application([
            (r'/', IndexHandler),
        ], debug=True)
    
        application.listen(8888)
        tornado.ioloop.IOLoop.current().start()
    

    There are a few tweaks required, aside from switching the data structure from a list to a toro.Queue:

    1. We need to schedule watch_queue to run inside the IOLoop using add_callback, rather than trying to call it directly outside of an IOLoop context.
    2. IndexHandler.get needs to be converted to a coroutine, because toro.Queue.put is a coroutine.

    I also added a while True loop to watch_queue, so that it will run forever, rather than just processing one item and then exiting.