Search code examples
asynchronousconcurrencyevent-handlingtornadofuture

Asynchronous task execution using Tornado web framework


I'm familiar with event-driven programming but I have encountered this problem and I have terminated possible solutions. I read the documentation of Tornado, I tried with:

  1. Futures
  2. gen.coroutine
  3. asynchronous
  4. add_timeout

but I was not able to solve the following problem:

  • I have a websocket server that just listen for new messages and based on the message type it calls a specific function

    class WebSocketHandler(tornado.websocket.WebSocketHandler):

    ...
    
    def on_message(self, message):
        if message['type'] is X:
            self.write(functionA(message['data']))
        elif message['type'] is Y:
            self.write(functionB(message['data']))
    ...
    

The problem comes when one computationally expensive function is execute, let say functionA, it can take up to 5 minutes to terminate

def functionA(message):
    params = extract_params(message)
    cmd = "computationally_expensive_tool"
    out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir)
    ...
    return json.dumps({
                        "error": False,
                        "message": "computationally_expensive_tool_execution_terminated",
                        "type": X
                    })

My question is how can I execute that function in an asynchronous way so that I can still handle other messages and the result of functionA when it will be ready?


Solution

  • If functionA is a blocking function that cannot be made asynchronous, you probably want to run it in a thread pool:

    executor = concurrent.futures.ThreadPoolExecutor()
    
    @gen.coroutine
    def on_message(self, message):
        if message['type'] is X:
            yield executor.submit(functionA, message['data'])
        elif message['type'] is Y:
            functionB(message['data'])
    

    This will block this websocket until functionA finishes, but allow other connections to continue working. If you need to continue processing other kinds of messages from the same connection while functionA runs, you need a more complicated arrangement, possibly involving a tornado.queues.Queue.