I modified the example queue with producer & consumer from this Tornado documentation, but it doesn't seem like the timeout parameter passed to get() is working at all, as the consumer doesn't wait 10 seconds before throwing the exception. Ideally, the producer and consumer would run at the same time. Also, I don't know whether to pass in the timeout parameter as seconds or milliseconds:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue()
@gen.coroutine
def consumer():
try:
while True:
item = yield q.get(timeout=10000)
try:
print('Doing work on %s' % item)
finally:
q.task_done()
except gen.TimeoutError:
print('timeout')
return
@gen.coroutine
def producer():
for item in range(5):
yield q.put(item)
print('Put %s' % item)
yield gen.sleep(2)
@gen.coroutine
def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)
and here is its execution:
Put 0
Doing work on 0
timeout
Put 1
Put 2
Put 3
Put 4
The timeout
As you can read in Tornado' Queue.get docs:
Returns a Future which resolves once an item is available or raises tornado.gen.TimeoutError after a timeout.
But it could be quite misleading, since timeout
is actually a deadline
. So it must be specified either with datetime.timedelta object:
import datetime
yield q.get(timeout=datetime.timedelta(seconds=1))
or absolute time:
timeout = 1.5 # in seconds, floats acceptable
deadline = IOLoop.current().time() + timeout
# in most cases IOLoop time is just time.time()
# I've used separate variables only for the notion
yield q.get(timeout=deadline)
In Toro, that was merged into Tornado, this argument was called deadline
.
In your code you specified timeout 10000
, that means deadline to Thu, 01 Jan 1970 02:46:40 GMT
.
Consumer loop
Since you have try
/except
block on the whole function, including while
loop, when TimeoutError
occurs your consumer stop working. Move exception handling into while
loop.
Working example:
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue()
@gen.coroutine
def consumer():
i = 0
while True:
i += 1
print('get cycle %s' % i)
try:
item = yield q.get(IOLoop.instance().time() + 3)
try:
print('Doing work on %s' % item)
finally:
q.task_done()
except gen.TimeoutError:
print('timeout')
@gen.coroutine
def producer():
for item in range(5):
yield q.put(item)
print('Put %s' % item)
yield gen.sleep(2)
@gen.coroutine
def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks.
print('Done')