Search code examples
pythontornado

Abandoning Futures in Tornado


I'm considering a fan-out proxy in tornado to query multiple backend servers and the possible use-case of having it not wait for all responses before returning.

Is there a problem with the remaining futures if you use a WaitIterator but not continuing to wait after receiving a useful response?

Perhaps the results of the other futures will not be cleaned up? Perhaps callbacks could be added to any remaining futures to discard their results?

#!./venv/bin/python

from tornado import gen
from tornado import httpclient
from tornado import ioloop
from tornado import web
import json


class MainHandler(web.RequestHandler):
    @gen.coroutine
    def get(self):
        r1 = httpclient.HTTPRequest(
            url="http://apihost1.localdomain/api/object/thing",
            connect_timeout=4.0,
            request_timeout=4.0,
        )
        r2 = httpclient.HTTPRequest(
            url="http://apihost2.localdomain/api/object/thing",
            connect_timeout=4.0,
            request_timeout=4.0,
        )
        http = httpclient.AsyncHTTPClient()
        wait = gen.WaitIterator(
            r1=http.fetch(r1),
            r2=http.fetch(r2)
        )
        while not wait.done():
            try:
                reply = yield wait.next()
            except Exception as e:
                print("Error {} from {}".format(e, wait.current_future))
            else:
                print("Result {} received from {} at {}".format(
                    reply, wait.current_future,
                    wait.current_index))
                if reply.code == 200:
                    result = json.loads(reply.body)
                    self.write(json.dumps(dict(result, backend=wait.current_index)))
                    return


def make_app():
    return web.Application([
        (r'/', MainHandler)
    ])


if __name__ == '__main__':
    app = make_app()
    app.listen(8888)
    ioloop.IOLoop.current().start()

Solution

  • So I've checked through the source for WaitIterator.

    It tracks the futures adding a callback, when fired the iterator queues the result or (if you've called next()) fulfils a future it's given to you.

    As the future you wait on only gets created by calling .next(), it appears you can exit out of the while not wait.done() and not leave any futures without observers.

    Reference counting ought to allow the WaitIterator instance to remain until after all the futures have fired their callbacks and then be reclaimed.

    Update 2017/08/02
    Having tested further with subclassing WaitIterator with extra logging, yes the iterator will be cleaned up when all the futures return, but if any of those futures return an exception it will be logged that this exception hasn't been observed.

    ERROR:tornado.application:Future exception was never retrieved: HTTPError: HTTP 599: Timeout while connecting

    In summary and answering my question: completing the WaitIterator isn't necessary from a clean-up point of view, but it is probably desirable to do so from a logging point of view.

    If you wanted to be sure, passing the the wait iterator to a new future that will finish consuming it and adding an observer may suffice. For example

    @gen.coroutine
    def complete_wait_iterator(wait):
        rounds = 0
        while not wait.done():
            rounds += 1
            try:
                reply = yield wait.next()
            except Exception as e:
                print("Not needed Error {} from {}".format(e, wait.current_future))
            else:
                print("Not needed result {} received from {} at {}".format(
                    reply, wait.current_future,
                    wait.current_index))
        log.info('completer finished after {n} rounds'.format(n=rounds))
    
    
    class MainHandler(web.RequestHandler):
        @gen.coroutine
        def get(self):
            r1 = httpclient.HTTPRequest(
                url="http://apihost1.localdomain/api/object/thing",
                connect_timeout=4.0,
                request_timeout=4.0,
            )
            r2 = httpclient.HTTPRequest(
                url="http://apihost2.localdomain/api/object/thing",
                connect_timeout=4.0,
                request_timeout=4.0,
            )
            http = httpclient.AsyncHTTPClient()
            wait = gen.WaitIterator(
                r1=http.fetch(r1),
                r2=http.fetch(r2)
            )
            while not wait.done():
                try:
                    reply = yield wait.next()
                except Exception as e:
                    print("Error {} from {}".format(e, wait.current_future))
                else:
                    print("Result {} received from {} at {}".format(
                        reply, wait.current_future,
                        wait.current_index))
                    if reply.code == 200:
                        result = json.loads(reply.body)
                        self.write(json.dumps(dict(result, backend=wait.current_index)))
                        consumer = complete_wait_iterator(wait)
                        consumer.add_done_callback(lambda f: f.exception())
                        return