I have some parsing application and it basically do the following
start()
method to the IOLoop as a callback to call on the next iterationstart()
calls another function, let's call it get_input()
get_input()
is a coroutine which fetches some data over the net, then schedules another coroutine, process_input()
, by adding it as start()
was added in the first step.get_input()
also checks some condition which depends on fetched data and may schedule itself with adjusted argumentsNow, after this condition renders False
I know that there won't be any new input items to process.
But how do I know that there are no Futures of get_input()
and process_input()
which are still unresolved?
I suppose this could be solved by implementing a kind of counter, which is incremented every time process_input()
is called and decrementing after it is resolved
But if there is a chain of different coroutines? How can I track their state so that if I stop IOLoop, no tasks will die before they get resolved?
Maybe, there should be some kind of hierarchical counting...
edit:
2 @dano
OK, I see now... I was inattentive. You really do not block as its own call is inside this list
But!
yield
construction must be used, no add_callback
as otherwise we lose "waiting" conceptWhat I came up with today is "metafuture"
I create a bare Future()
object.
I decorate every @coroutine
-enabled method with my decorator, which increments counter field in "metafuture" and adds a custom done callback to their futures, that should decrement it.
When it reaches zero, "metafuture" resolves by calling set_result(None)
THere is also a IOLoop callback that yields exactly that metafuture:
@coroutine
def wait_for_complete(self):
yield self._input_data_collected
yield self._all_futures_resolved
self.complete()
So after that we know no futures are pending. THat's a hard way like manually implementing refcounting but it covers IOLoop.add_callback()
way of adding tasks as well
You could write your methods so that they don't return until all the work is done, rather than scheduling callbacks. Then you could just call IOLoop.run_sync(start)
and the call won't return until all the processing is complete:
from tornado import gen
from tornado.ioloop import IOLoop
@gen.coroutine
def start():
yield get_input()
@gen.coroutine
def get_input(*args, **kwargs):
data = yield fetch_data_over_net()
futs = [] # list of Future objects
futs.append(process_input(data))
if should_call_myself(data):
futs.append(get_input(newargs, newkwargs))
yield futs # This will wait for all Future objects in the list to complete.
@gen.coroutine
def process_input(data):
# do stuff
if __name__ == "__main__":
IOLoop.instance().run_sync(start)
We take advantage of the fact that coroutines return Futures
, and that Tornado supports waiting for multiple Futures in parallel so that we can run as much as possible concurrently, without actually ever returning from get_input
(and therefore start
) before all the dependent work is done.