Search code examples
multithreadingtornado

tornado server is incompatible with threading module


I'm using tornado with threads.

In short, each time the websocket handler receives a requests, it start to execute a task, which might take a few minutes.

However, once a client is connected, no other client can be connected, until the first one disconnects.

Any ideas?

I've attached a minimal example that uses time.sleep to simulate long running tasks.

import tornado.web
import tornado.websocket
import tornado.httpserver
import tornado.ioloop
import time
import json
import threading

class TaskHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        pass

    def check_origin(self, origin):
        return True

    def on_message(self, message):
        try:
            print 'received: ', message
            self.write_message(json.dumps({'status': 'running'}))

            def worker_A(kwargs):
                time.sleep(100)
                pass

            def worker_B(kwargs):
                time.sleep(100)
                pass

            threads = []
            for target in [worker_A, worker_B]:
                t = threading.Thread(target = target, args = ({'xxx': 'yyy'}, ))
                t.daemon = True
                t.start()
                threads.append(t)

            for t in threads:
                t.join()

        except Exception, e:
            print 'TaskHandler: exception: ', e
            pass

        self.write_message(json.dumps({'status': 'done'}))

    def on_close(self):
        pass

class Server(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/task', TaskHandler),
        ]

        tornado.web.Application.__init__(self, handlers)

if __name__ == '__main__':
    server  = tornado.httpserver.HTTPServer(Server())
    server.listen(8765, address = '127.0.0.1')
    tornado.ioloop.IOLoop.instance().start()

Solution

  • You block the whole Tornado event loop for 100 seconds in t.join. Unless you have a yield statement or schedule a callback and exit a function, then your function is not asynchronous. Notice how your function "on_message" begins two threads and then calls t.join on each -- how can Tornado's event loop accomplish any other work while your function is waiting for t.join?

    Instead, use a ThreadPoolExecutor something like this:

    thread_pool = ThreadPoolExecutor(4)
    
    class TaskHandler(tornado.websocket.WebSocketHandler):
        # Make this an asynchronous coroutine
        @gen.coroutine
        def on_message_coroutine(self, message):
            print 'received: ', message
            self.write_message(json.dumps({'status': 'running'}))
    
            def worker_A(kwargs):
                time.sleep(100)
                pass
    
            def worker_B(kwargs):
                time.sleep(100)
                pass
    
            futures = []
            for target in [worker_A, worker_B]:
                f = thread_pool.submit(target, {'xxx': 'yyy'})
                futures.append(future)
    
            # Now the event loop can do other things
            yield futures
    
        def on_message(self, message):
            IOLoop.current().spawn_callback(self.on_message_coroutine,
                                            message)