Search code examples
tornado

Tornado TCP Server / Client process communication


I want to setup communication between a number of Tornado processes each acting as web-servers I.e. using tornado.web.RequestHandler. The idea is that i want a fully meshed network between the processes. I have 4 processes and I want to establish an ongoing permanent communication between them using the tornado.tcpserver and tornado.tcpclient:

T1---T2
| \  /| 
|  \/ |
| / \ |
T3---T4

I'm new to TCP programming however in the example I've seen in the tornado documentation: http://www.tornadoweb.org/en/stable/iostream.html Under Implementations for class tornado.iostream.IOStream once a socket is established all the communication is done and then then socket is closed. The example drives the code through blocks with callbacks each performing their duty of the communication.

However is it possible to open a TCP connection and have the BaseIOStream.read_until_close() idle and called only when the client writes to the server?

In other words the client and server stay connected and when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?

Or is my thinking misguided and the way to do this is every time I need the processes to communicate I establish a new TCP connection, do the work and then kill the connection? It just seems like establishing this new connection every time would contain a lot of overhead rather than leaving the connection open...


Solution

  • Here's a basic implementation. (I can't promise it's production-quality!) Save it to a file and execute something like this, each in a different terminal window:

    > python myscript.py 10001 10002 10003
    > python myscript.py 10002 10003 10001
    > python myscript.py 10003 10001 10002
    

    The first argument is the listening port, the remaining arguments are the ports of the other servers.

    import argparse
    import logging
    import os
    import random
    import socket
    import struct
    
    from tornado import gen
    from tornado.ioloop import IOLoop
    from tornado.iostream import IOStream, StreamClosedError
    from tornado.tcpclient import TCPClient
    from tornado.tcpserver import TCPServer
    from tornado.options import options as tornado_options
    
    
    parser = argparse.ArgumentParser()
    parser.add_argument("port", type=int, help="port to listen on")
    parser.add_argument("peers", type=int, nargs="+", help="peers' ports")
    opts = parser.parse_args()
    
    # This is just to configure Tornado logging.
    tornado_options.parse_command_line()
    logger = logging.getLogger(os.path.basename(__file__))
    logger.setLevel(logging.INFO)
    
    # Cache this struct definition; important optimization.
    int_struct = struct.Struct("<i")
    _UNPACK_INT = int_struct.unpack
    _PACK_INT = int_struct.pack
    
    tcp_client = TCPClient()
    
    
    @gen.coroutine
    def client(port):
        while True:
            try:
                stream = yield tcp_client.connect('localhost', port)
                logging.info("Connected to %d", port)
    
                # Set TCP_NODELAY / disable Nagle's Algorithm.
                stream.set_nodelay(True)
    
                while True:
                    msg = ("Hello from port %d" % opts.port).encode()
                    length = _PACK_INT(len(msg))
                    yield stream.write(length + msg)
                    yield gen.sleep(random.random() * 10)
    
            except StreamClosedError as exc:
                logger.error("Error connecting to %d: %s", port, exc)
                yield gen.sleep(5)
    
    
    loop = IOLoop.current()
    
    for peer in opts.peers:
        loop.spawn_callback(client, peer)
    
    
    class MyServer(TCPServer):
        @gen.coroutine
        def handle_stream(self, stream, address):
            logging.info("Connection from peer")
            try:
                while True:
                    # Read 4 bytes.
                    header = yield stream.read_bytes(4)
    
                    # Convert from network order to int.
                    length = _UNPACK_INT(header)[0]
    
                    msg = yield stream.read_bytes(length)
                    logger.info('"%s"' % msg.decode())
    
                    del msg  # Dereference msg in case it's big.
    
            except StreamClosedError:
                logger.error("%s disconnected", address)
    
    
    server = MyServer()
    server.listen(opts.port)
    
    loop.start()
    

    Notice that we don't call read_until_close, so we need some way to know when a message is completely received. I do this with a 32-bit integer at the beginning of each message which encodes the length of the rest of the message.

    You asked, "when the client writes to the server it somehow interrupts the Tornado IOLoop to call the read()?" This is what Tornado's IOLoop is for, and it's what we mean by "async": many Tornado coroutines or callbacks can wait for network events, and the IOLoop wakes them when the events they're awaiting occur. That's what's happening wherever you see "yield" in the code above.