Search code examples
pythonconcurrencymultiprocessingtornadoconcurrent.futures

Tornado concurrency errors running multiple processes together with a process pool executor


I'm trying to run multiple processes and at the same time use the concurrent.futures.ProcessPoolExecutor to run CPU intensive jobs. The first few requests are happily served, but then a KeyError is raised from concurrent.futures.process, and the server hangs.

Is this a bug in Tornado?

This is the simplest form I stripped the code to.

server:

# -*- coding: utf-8 -*-
"""
server runs 2 processes and does job on a ProcessPoolExecutor
"""
import tornado.web
import tornado.ioloop
import tornado.gen
import tornado.options
import tornado.httpserver

from concurrent.futures import ProcessPoolExecutor


class MainHandler(tornado.web.RequestHandler):

    executor = ProcessPoolExecutor(1)

    @tornado.gen.coroutine
    def post(self):
        num = int(self.request.body)
        result = yield self.executor.submit(pow, num, 2)
        self.finish(str(result))


application = tornado.web.Application([
    (r"/", MainHandler),
])


def main():
    tornado.options.parse_command_line()
    server = tornado.httpserver.HTTPServer(application)
    server.bind(8888)
    server.start(2)
    tornado.ioloop.IOLoop.instance().start()


if __name__ == '__main__':
    main()

client:

# -*- coding: utf-8 -*-
"""
client
"""
from tornado.httpclient import AsyncHTTPClient
from tornado.gen import coroutine
from tornado.ioloop import IOLoop


@coroutine
def remote_compute(num):
    rsp = yield AsyncHTTPClient().fetch(
        'http://127.0.0.1:8888', method='POST', body=str(num))
    print 'result:', rsp.body


IOLoop.instance().run_sync(lambda: remote_compute(10))

error traceback

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.7_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/2.7.7_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/cliffxuan/.virtualenvs/executor/lib/python2.7/site-packages/concurrent/futures/process.py", line 216, in _queue_management_worker
    work_item = pending_work_items[result_item.work_id]
KeyError: 0

Solution

  • This is related to the interaction between tornado and concurrent.futures when you start the tornado server with multiple processes, using server.start(2). This internally uses os.fork() to create the two processes. Because you're declaring the Executor as a class variable, it gets instantiated when the MainHandler class itself is executed, prior to server.start() actually running. That means that both processes end up sharing a single (albeit forked) ProcessPoolExecutor instance. This leads to some oddities - each process gets copy-on-write versions of most data structures inside the Executor, but they end up actually sharing the same worker processes.

    ProcessPoolExecutor doesn't support being shared between processes like this, so have the problems you see when the second process tries to use the Executor. You can work around it by only creating the Executor after the fork has occurred:

    class MainHandler(tornado.web.RequestHandler):
        executor = None # None for now
    
        @tornado.gen.coroutine
        def post(self):
            num = int(self.request.body)
            result = yield self.executor.submit(pow, num, 2)
            self.finish(str(result))
    
    
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    
    
    def main():
        tornado.options.parse_command_line()
        server = tornado.httpserver.HTTPServer(application)
        server.bind(8889)
        server.start(2) # We fork here
        MainHandler.executor = ProcessPoolExecutor(1) # Now we can create the Executor
        tornado.ioloop.IOLoop.instance().start()