Search code examples
pythonqueuetornado

tornado asynchronous queue not waiting


I modified the example queue with producer & consumer from this Tornado documentation, but it doesn't seem like the timeout parameter passed to get() is working at all, as the consumer doesn't wait 10 seconds before throwing the exception. Ideally, the producer and consumer would run at the same time. Also, I don't know whether to pass in the timeout parameter as seconds or milliseconds:

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue()

@gen.coroutine
def consumer():
    try:
        while True:
            item = yield q.get(timeout=10000)
            try:
                print('Doing work on %s' % item)      
            finally:
                q.task_done()
    except gen.TimeoutError:
        print('timeout')
        return

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)
        yield gen.sleep(2)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)

and here is its execution:

Put 0
Doing work on 0
timeout
Put 1
Put 2
Put 3
Put 4

Solution

  • The timeout

    As you can read in Tornado' Queue.get docs:

    Returns a Future which resolves once an item is available or raises tornado.gen.TimeoutError after a timeout.

    But it could be quite misleading, since timeout is actually a deadline. So it must be specified either with datetime.timedelta object:

    import datetime
    yield q.get(timeout=datetime.timedelta(seconds=1))
    

    or absolute time:

    timeout = 1.5  # in seconds, floats acceptable
    deadline = IOLoop.current().time() + timeout
    # in most cases IOLoop time is just time.time()
    # I've used separate variables only for the notion
    
    yield q.get(timeout=deadline)
    

    In Toro, that was merged into Tornado, this argument was called deadline.

    In your code you specified timeout 10000, that means deadline to Thu, 01 Jan 1970 02:46:40 GMT.

    Consumer loop

    Since you have try/except block on the whole function, including while loop, when TimeoutError occurs your consumer stop working. Move exception handling into while loop.

    Working example:

    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.queues import Queue
    
    q = Queue()
    
    @gen.coroutine
    def consumer():
        i = 0
        while True:
            i += 1
            print('get cycle %s' % i)
            try:
                item = yield q.get(IOLoop.instance().time() + 3)
                try:
                    print('Doing work on %s' % item)
                finally:
                    q.task_done()
            except gen.TimeoutError:
                print('timeout')
    
    @gen.coroutine
    def producer():
        for item in range(5):
            yield q.put(item)
            print('Put %s' % item)
            yield gen.sleep(2)
    
    @gen.coroutine
    def main():
        # Start consumer without waiting (since it never finishes).
        IOLoop.current().spawn_callback(consumer)
        yield producer()     # Wait for producer to put all tasks.
        yield q.join()       # Wait for consumer to finish all tasks.
        print('Done')