Search code examples
pythontcptwisted

Twisted protocol to handle concatenated TCP stream with custom frame structure?


I got a job to build a long-pull TCP socket server as device server. Twisted is selected during my development. It works fine with my Python device simulator. However the real device send out concatenated (or combined) TCP packets. I know that is normal in real network and device, although the TCP packet is quite short.

It has three frame structure:

\xFDAA + "realtime_data" + \xCCDD (length is fixed at 150B)

\xFDCC + "extra_data" + \xCCDD (length is fixed at 190B)

\xFDCC + "extra_data" + \xCCDD (length is fixed at 192B)

Obviously, \xFDAA \xFDCC are headers, and \xCCDD is EOT. So they do have bondaries. And they also implied fixed length, without defined in protocol itself.

However, I have no idea how to handle that concatenated packets of custom frame with exiting Twisted methods. During my developer, I used dataReceiver.

So far I am trying to parse the packet and store it in a buffer in Factory of protocol. When each new packet arrived, I will combined the previous buffered data with new one to parse (combined if concatenated, seperate them if combined packets received ... But that seems dirty).

I have checked FAQ of twistedmatrix.com. It recommended following solutions:

LineReceiver (with \r\n ending chars) 
NetstringReceiver (with callback for every string received) 
Int8/16/32Receiver (with prefix length information)

And then also AMP and PB high level messaging is recommended.

I will like to hear any sugguestings from twisted experts on how to implement it with twisted officially. And URL/demo code is highly helpful.


Solution

  • None of the set of LineReceiver, NetstringReceiver, Int8/16/32Receiver, AMP, or PB are applicable to your problem, as they are all implementations of specific framing (and in the case of the latter two, messaging) protocols. You, instead, have a custom protocol you want to implement.

    Luckily this is relatively straightforward: Twisted delivers you data via your IProtocol implementations' dataReceived method.

    The best way to deal with this sort of thing is actually to implement a simple function first rather than worrying about exactly how it is going to be plugged into Twisted. In your case, you need a function that parses your protocol; but, since dataReceived might deliver you a partial packet, you need to ensure that the function returns 2 things: the parsed data, and any remaining buffer. Once you have such a function, you can then plug it in to a Protocol subclass quite easily.

    Your explanation of the protocol wasn't very clear, so this might not be quite correct, but I interpreted your description of the message format to be like this:

    octet 0xFD
    octet 0xAA
    150 octets of "realtimeData"
    octet 0xCC
    octet 0xDD
    octet 0xFD
    octet 0xCC
    190 octets of "extraData1"
    octet 0xCC
    octet 0xDD
    octet 0xFD
    octet 0xCC
    192 octets of "extraData2"
    octet 0xCC
    octet 0xDD
    

    in other words, a single protocol message is 544 bytes long, and contains 3 fields and 12 bytes of padding which must be correct.

    So let's first write a Message class that represents a message with these three fields, using the standard library struct module to parse and serialize its fields:

    from struct import Struct
    
    class Message(object):
        format = Struct(
            "!" # Network endian; always good form.
            "2s" # FD AA
            "150s" # realtimeData
            "4s" # CC DD FD CC
            "190s" # extra1
            "4s" # CC DD FD CC
            "192s" # extra2
            "2s" # CC DD
        )
    
        def __init__(self, realtimeData, extra1, extra2):
            self.realtimeData = realtimeData
            self.extra1 = extra1
            self.extra2 = extra2
    
        def toBytes(self):
            return self.format.pack(
                b"\xFD\xAA", self.realtimeData, b"\xCC\xDD\xFD\xCC", self.extra1,
                b"\xCC\xDD\xFD\xCC", self.extra2, b"\xCC\xDD"
            )
    
        @classmethod
        def fromBytes(cls, octets):
            [fdaa, realtimeData, ccddfdcc, extra1, ccddfdcc2, extra2,
             ccdd] = cls.format.unpack(octets)
            # verify message integrity
            assert fdaa == b"\xFD\xAA"
            assert ccddfdcc == b"\xCC\xDD\xFD\xCC"
            assert ccddfdcc2 == b"\xCC\xDD\xFD\xCC"
            assert ccdd == b"\xCC\xDD"
            return cls(realtimeData, extra1, extra2)
    
        @classmethod
        def parseStream(cls, streamBytes):
            sz = cls.format.size
            messages = []
            while len(streamBytes) >= sz:
                messageData, streamBytes = streamBytes[:sz], streamBytes[sz:]
                messages.append(cls.fromBytes(messageData))
            return messages, streamBytes
    

    The important part here for interfacing with Twisted is the final parseStream method, which turns a pile of bytes into a pile of messages, and the remaining bytes that weren't parsed yet. We can then have a Protocol make sense of an actual network stream, like so:

    from twisted.internet.protocol import Protocol
    class MyProtocol(Protocol):
        buffer = b""
        def dataReceived(self, data):
            messages, self.buffer = Message.parseStream(self.buffer + data)
            for message in messages:
                self.messageReceived(message)
        def messageReceived(self, message):
            "do something useful with a message"
    

    Rather than calling self.messageReceived, you might want to call a method on some other attribute of self, or perhaps relay the Message object up to the factory associated with this protocol. It's up to you! Since you said that you want to "parse the packet and store it in a buffer in Factory", perhaps you just want to do self.factory.messagesBuffer.append(message). Hopefully this seems cleaner than your "packet concatenation" approach, which was not described clearly enough for me to understand what you thought was distasteful about it.