Search code examples
python-3.xtornadopython-3.5

Starting multiprocessed TCPServer and HTTPServer in Tornado/Python3?


I have two Tornado servers running in the same Python process. One is a TCPServer, while the other is an HTTPServer. The TCPServer receives data from some source, and then the HTTPServer broadcasts that data out to any clients connected to the served webpage. I have the two servers communicating data between each other with a tornado.queues.Queue(), wherein the TCPServer puts data into the queue, and HTTPServer gets data from it and sends it off to webpage clients.

The question is, how do I make this take advantage of multiple cores? Data is only coming in from once source, so it makes sense to dedicate only 1 core to the TCPServer, but since there can be arbitrarily many clients I'd like to spin up a number of processes each with their own instance of HTTPServer to serve clients.

From the Tornado documentation, I see that if all of the servers were unrelated, I'd just have to do something like this:

def main():
    app = make_app()
    server = tornado.httpserver.HTTPServer(app)
    server.bind(8888)
    server.start(0)  # forks one process per cpu
    IOLoop.current().start()

However, since I have two servers running in a single process, I can't quite mirror that. When I try to do that, I get an error saying that:

Traceback (most recent call last):
  File "./tcp_server.py", line 28, in <module>
    http_server.start(0)
  File "/usr/local/lib/python3.5/dist-packages/tornado/tcpserver.py", line 205, in start
    process.fork_processes(num_processes)
  File "/usr/local/lib/python3.5/dist-packages/tornado/process.py", line 130, in fork_processes
    raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
RuntimeError: Cannot run in multiple processes: IOLoop instance has already been initialized. You cannot call IOLoop.instance() before calling start_processes()

How can I reorganize the startup procedure so that I have the TCPServer in one process, and the multiple instances of the HTTPServer on all the other processes?

import logging
import tornado
import tornado.queues
from buggy_data_server import BuggyDataServer
from buggy_http_server import BuggyHttpServer

if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    data_queue = tornado.queues.Queue()

    # Currently, both servers are started in a single process. This needs to be
    # split up into a multiprocess configuration later.
    data_sock = tornado.netutil.bind_sockets(4242)
    data_server = BuggyDataServer(data_queue) # TCPServer
    data_server.add_sockets(data_sock)

    http_sock = tornado.netutil.bind_sockets(8080)
    http_server = BuggyHttpServer(data_queue) # HTTPServer
    http_server.add_sockets(http_sock)
    http_server.start(0)

    tornado.ioloop.IOLoop.instance().start()

Solution

  • This falls under the "advanced multi-process" category of the TCPServer docs. You need to call fork_processes explicitly instead of letting one of the servers do it. Reorder your setup calls so that you bind all the sockets before the fork, and add them to the servers after (as a rule of thumb, you want to do as little as possible before the fork):

    data_sock = tornado.netutil.bind_sockets(4242)
    http_sock = tornado.netutil.bind_sockets(8080)
    
    tornado.process.fork_processes(0)
    
    data_server = BuggyDataServer(data_queue) # TCPServer
    data_server.add_sockets(data_sock)
    http_server = BuggyHttpServer(data_queue) # HTTPServer
    http_server.add_sockets(http_sock)
    

    This starts an HTTP server and a TCP server in each process. If you need to only run one TCP server (and not run an HTTP server in that process), you can look at the return value of fork_processes:

    http_sock = tornado.netutil.bind_sockets(8080)
    proc_num = tornado.process.fork_processes(0)
    
    if proc_num == 0:
        # Close the HTTP sockets in the TCP server's process
        for s in http_sock: s.close()
        BuggyDataServer(data_queue).listen(4242)
    else:
        BuggyHttpServer(data_queue).add_sockets(http_sock)