Search code examples
pythonasynchronouscallbacktornadopython-asyncio

Write back through the callback attached to IOLoop in Tornado


There is a tricky post handler, sometimes it can take a lots of time (depending on a input values), sometimes not.

What I want is to write back whenever 1 second passes, dynamically allocating the response.

def post():
    def callback():
        self.write('too-late')
        self.finish()

    timeout_obj = IOLoop.current().add_timeout(
        dt.timedelta(seconds=1),
        callback,
    )

    # some asynchronous operations

    if not self.request.connection.stream.closed():
        self.write('here is your response')
        self.finish()
        IOLoop.current().remove_timeout(timeout_obj)

Turns out I can't do much from within callback.

Even raising an exception is suppressed by the inner context and won't be passed through the post method.

Any other ways to achieve the goal?

Thank you.

UPD 2020-05-15: I found similar question

Thanks @ionut-ticus, using with_timeout() is much more convenient.

After some tries, I think I came really close to what i'm looking for:

def wait(fn):
    @gen.coroutine
    @wraps(fn)
    def wrap(*args):
        try:
            result = yield gen.with_timeout(
                    dt.timedelta(seconds=20),
                    IOLoop.current().run_in_executor(None, fn, *args),
            )
            raise gen.Return(result)
        except gen.TimeoutError:
            logging.error('### TOO LONG')
            raise gen.Return('Next time, bro')
    return wrap


@wait
def blocking_func(item):
    time.sleep(30)
    # this is not a Subprocess.
    # It is a file IO and DB
    return 'we are done here'
  1. Still not sure, should wait() decorator being wrapped in a coroutine?

  2. Some times in a chain of calls of a blocking_func(), there can be another ThreadPoolExecutor. I have a concern, would this work without making "mine" one global, and passing to the Tornado's run_in_executor()?

Tornado: v5.1.1


Solution

  • An example of usage of tornado.gen.with_timeout. Keep in mind the task needs to be async or else the IOLoop will be blocked and won't be able to process the timeout:

    @gen.coroutine
    def async_task():
        # some async code
    
    @gen.coroutine
    def get(self):
        delta = datetime.timedelta(seconds=1)
        try:
            task = self.async_task()
            result = yield gen.with_timeout(delta, task)
            self.write("success")
        except gen.TimeoutError:
            self.write("timeout")