Search code examples
pythontornadofuture

How can I know that all Futures are resolved in Tornado?


I have some parsing application and it basically do the following

  • Adds start() method to the IOLoop as a callback to call on the next iteration
  • start() calls another function, let's call it get_input()
  • get_input() is a coroutine which fetches some data over the net, then schedules another coroutine, process_input(), by adding it as start() was added in the first step.
  • get_input() also checks some condition which depends on fetched data and may schedule itself with adjusted arguments

Now, after this condition renders False I know that there won't be any new input items to process.
But how do I know that there are no Futures of get_input() and process_input() which are still unresolved?

I suppose this could be solved by implementing a kind of counter, which is incremented every time process_input() is called and decrementing after it is resolved
But if there is a chain of different coroutines? How can I track their state so that if I stop IOLoop, no tasks will die before they get resolved?

Maybe, there should be some kind of hierarchical counting...

edit:

2 @dano OK, I see now... I was inattentive. You really do not block as its own call is inside this list
But!

  1. Such organization reqires that only yield construction must be used, no add_callback as otherwise we lose "waiting" concept
  2. The recursion level grows.. Mmm, dunno if it's too bad

What I came up with today is "metafuture"
I create a bare Future() object.
I decorate every @coroutine-enabled method with my decorator, which increments counter field in "metafuture" and adds a custom done callback to their futures, that should decrement it.

When it reaches zero, "metafuture" resolves by calling set_result(None) THere is also a IOLoop callback that yields exactly that metafuture:

    @coroutine
    def wait_for_complete(self):
      yield self._input_data_collected
      yield self._all_futures_resolved
      self.complete()

So after that we know no futures are pending. THat's a hard way like manually implementing refcounting but it covers IOLoop.add_callback() way of adding tasks as well


Solution

  • You could write your methods so that they don't return until all the work is done, rather than scheduling callbacks. Then you could just call IOLoop.run_sync(start) and the call won't return until all the processing is complete:

    from tornado import gen
    from tornado.ioloop import IOLoop
    
    @gen.coroutine
    def start():
        yield get_input()
    
    @gen.coroutine
    def get_input(*args, **kwargs):
        data = yield fetch_data_over_net()
        futs = []  # list of Future objects
        futs.append(process_input(data))
        if should_call_myself(data):
            futs.append(get_input(newargs, newkwargs))
        yield futs # This will wait for all Future objects in the list to complete.
    
    @gen.coroutine
    def process_input(data):
        # do stuff
    
    if __name__ == "__main__":
        IOLoop.instance().run_sync(start)
    

    We take advantage of the fact that coroutines return Futures, and that Tornado supports waiting for multiple Futures in parallel so that we can run as much as possible concurrently, without actually ever returning from get_input (and therefore start) before all the dependent work is done.