Search code examples
pythontcptornado

How to process multiple commands to read from socket at the same time in Tornado asynchronous TCP?


My TCP server is made with Tornado's asynchronous TCP. The client is written in C.

Server code:

#! /usr/bin/env python
#coding=utf-8

from tornado.tcpserver import TCPServer
from tornado.ioloop import IOLoop

class TcpConnection(object):
    def __init__(self,stream,address):
        self._stream=stream
        self._address=address
        self._stream.set_close_callback(self.on_close)
        self.send_message(b'hello \n')
        self.send_message(b'world \n')

    def read_message(self):
        self._stream.read_until(b'\n', self.handle_message)

    def handle_message(self,data):
        print(data)

    def send_message(self,data):
        self._stream.write(data)
        self.read_message()

    def on_close(self):
        print("the monitored %d has left",self._address)

class MonitorServer(TCPServer):
    def handle_stream(self,stream,address):
        print("new connection",address,stream)
        TcpConnection(stream,address)

if  __name__=='__main__':
    print('server start .....')
    server=MonitorServer()
    server.listen(20000)
    IOLoop.instance().start()

Client code:

#include <Winsock2.h>
#include <stdio.h>
#pragma comment(lib, "ws2_32.lib")

typedef struct SytemInit
{
    char computer[32];      
    char user[32];              
    char os[256];               
    char processor[256];    
    char mem[128];          
    char disk[128];             
}SYSTEMINIT;

typedef struct Command
{
    int  commandType;                             
    char commandInfo[256];                 
}COMMAND;

void main()
{
    int err;
    SYSTEMINIT message;
    COMMAND recvBuf;

    SOCKET sockClient; 
    SOCKADDR_IN addrServer; 

    WSADATA wsaData;
    WORD wVersionRequested;

    wVersionRequested = MAKEWORD( 2, 2 );

    err = WSAStartup( wVersionRequested, &wsaData );

    if ( err != 0 )
    {
        return;
    }

    if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 )
    {
        WSACleanup( );
        return;
    }

    sockClient = socket(AF_INET, SOCK_STREAM, 0);

    addrServer.sin_addr.S_un.S_addr = inet_addr("172.16.110.1");  
    addrServer.sin_family = AF_INET;                           
    addrServer.sin_port = htons(20000);                         

    connect(sockClient, (SOCKADDR *)&addrServer, sizeof(SOCKADDR));

    recv(sockClient, (char*)&recvBuf, 100, 0);

    strcpy(message.computer,"zz-pc");
    strcpy(message.disk,"zz-disk");
    strcpy(message.mem,"zz-men");
    strcpy(message.os,"zz-os");
    strcpy(message.processor,"zz-processor");
    strcpy(message.user,"zz-user");

    send(sockClient, (char*)&message, sizeof(message) + 1, 0);

    closesocket(sockClient);
    WSACleanup();
}

I get the following error when I execute them:

ERROR:tornado.application:Error in connection callback
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/tornado/tcpserver.py", line 269, in _handle_connection
    future = self.handle_stream(stream, address)
  File "/home/zz/PycharmProjects/monitor/test.py", line 34, in handle_stream
    TcpConnection(stream,address)
  File "/home/zz/PycharmProjects/monitor/test.py", line 15, in __init__
    self.send_message(b'world \n')
  File "/home/zz/PycharmProjects/monitor/test.py", line 25, in send_message
    self.read_message()
  File "/home/zz/PycharmProjects/monitor/test.py", line 18, in read_message
    self._stream.read_until(b'\n', self.handle_message)
  File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 270, in read_until
    future = self._set_read_callback(callback)
  File "/usr/local/lib/python3.4/dist-packages/tornado/iostream.py", line 658, in _set_read_callback
    assert self._read_callback is None, "Already reading"
AssertionError: Already reading

I guess this error is because self.send_message(b'hello \n') and self.send_message(b'world \n') read from the socket at the same time. How can I solve this?


Solution

  • This code of yours:

    self.send_message(b'hello \n')
    self.send_message(b'world \n')
    

    results in the following:

    self._stream.write(b'hello \n')
    self._stream.read_until(b'\n', self.handle_message)
    self._stream.write(b'world \n')
    self._stream.read_until(b'\n', self.handle_message)
    

    Since you're calling read_until with a callback, you're trying to do both read_untils in parallel at the same time. That's nonsense, though, because they're coming one after the other over the TCP connection. You have to first read one message, and then read the other message.

    I feel that using gen.coroutine would make this easier. You can also do it with callbacks; I'll show how later.

    Using gen.coroutine

    Here's how I'd change your TcpConnection class with coroutines:

    class TcpConnection(object):
        def __init__(self,stream,address):
            self._stream=stream
            self._address=address
            self._stream.set_close_callback(self.on_close)
    
        @gen.coroutine
        def send_messages(self):
            yield self.send_message(b'hello \n')
            response1 = yield self.read_message()
            print(response1)
            yield self.send_message(b'world \n')
            # You can receive the result in-line, but you need to wrap with ( ):
            print((yield self.read_message()))
    
        def read_message(self):
            return self._stream.read_until(b'\n')
    
        def send_message(self,data):
            return self._stream.write(data)
    
        def on_close(self):
            print("the monitored %d has left",self._address)
    
    class MonitorServer(TCPServer):
        @gen.coroutine
        def handle_stream(self,stream,address):
            print("new connection",address,stream)
            conn = TcpConnection(stream,address)
            yield conn.send_messages()
    

    By using coroutines, you can write your code in the order you want it to execute in, and you can read the response like a return value into a local variable, instead of having to use a handler method. Whenever you yield something, you're pausing to wait for it to finish.

    I also separated send_message() and receive_message(), because I think it makes it clearer. If you think it's better to keep them together in send_message(), you can do that with something like this:

    @gen.coroutine
    def send_message(self,data):
        yield self._stream.write(data)
        return (yield self.receive_message())
    

    If instead you wanted to send both messages first, and then wait to receive both responses, you could also do that:

    @gen.coroutine
    def send_messages(self):
        yield self.send_message(b'hello \n')
        yield self.send_message(b'world \n')
        print((yield self.read_message()))
        print((yield self.read_message()))
    

    Using callbacks

    Anything you can code with coroutines, you can code with callbacks. What you need to do, however, is keep track of your state (where you're at) between the callbacks. This can be done by jumping around between different callbacks. For example:

    def send_first_message(self):
        self.send_message(b'hello \n', self.receive_first_response)
    
    def receive_first_response(self, data):
        print(data)
        self.send_message(b'world \n', self.receive_second_response)
    
    def receive_second_response(self, data):
        print(data)
    
    def read_message(self, callback):
        self._stream.read_until(b'\n', callback)
    
    def send_message(self, data, callback):
        self._stream.write(data)
        self.read_message(callback)
    

    Or by some other way of keeping track of where you are in the communication, such as storing something in a field of your class instance.