Search code examples
c#tcp.net-core

System.IO.Pipelines Length Field Based TCP Decoding


I'm attempting to port code from DotNetty to System.IO.Pipelines. In DotNetty I'm leveraging the LengthFieldBasedFrameDecoder to decode a TCP message where the first two bytes represent an integer indicating the length of the entire message.

All the demos I've seen rely on string based EOL indicators. i feel this should be easy but it's escaping me how to grab the first two bytes, and then X amount of bytes, as indicated by the length prefix.

Below is an example taken from David Fowler's TcpEcho server. How can I rewrite this to parse the message if the first two bytes indicated the message size rather than an EOL character indicating the end of messsage?

 private static async Task ReadPipeAsync(Socket socket, PipeReader reader)
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();

            ReadOnlySequence<byte> buffer = result.Buffer;
            SequencePosition? position = null;

            do
            {
                // Find the EOL
                position = buffer.PositionOf((byte)'\n');

                if (position != null)
                {
                    var line = buffer.Slice(0, position.Value);
                    ProcessLine(socket, line);

                    // This is equivalent to position + 1
                    var next = buffer.GetPosition(1, position.Value);

                    // Skip what we've already processed including \n
                    buffer = buffer.Slice(next);
                }
            }
            while (position != null);

            // We sliced the buffer until no more data could be processed
            // Tell the PipeReader how much we consumed and how much we left to process
            reader.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        reader.Complete();
    }

Solution

  • This is what I ended up with:

    private const int lengthPrefixSize = 2; // number of bytes in the length prefix
    private static ushort ParseLengthPrefix(ReadOnlySpan<byte> buffer) => BinaryPrimitives.ReadUInt16LittleEndian(buffer);
    
    private static ushort ParseLengthPrefix(in ReadOnlySequence<byte> buffer)
    {
        if (buffer.First.Length >= lengthPrefixSize)
            return ParseLengthPrefix(buffer.First.Span.Slice(0, lengthPrefixSize));
    
        Span<byte> lengthPrefixBytes = stackalloc byte[lengthPrefixSize];
        buffer.Slice(0, lengthPrefixSize).CopyTo(lengthPrefixBytes);
        return ParseLengthPrefix(lengthPrefixBytes);
    }
    
    private static async Task ReadPipeAsync(Socket socket, PipeReader reader)
    {
        ushort? lengthPrefix = null;
    
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
    
            ReadOnlySequence<byte> buffer = result.Buffer;
    
            while (true)
            {
                if (lengthPrefix == null)
                {
                    // If we don't have enough for the length prefix, then wait for more data.
                    if (buffer.Length < lengthPrefixSize)
                        break;
    
                    // Read and parse the length prefix
                    lengthPrefix = ParseLengthPrefix(buffer);
                    buffer = buffer.Slice(lengthPrefixSize);
                }
    
                // If we haven't read the entire packet yet, then wait.
                if (buffer.Length < lengthPrefix.Value)
                    break;
    
                // Read the data packet
                var line = buffer.Slice(0, lengthPrefix.Value);
                ProcessLine(socket, line);
    
                buffer = buffer.Slice(lengthPrefix.Value);
                lengthPrefix = null;
            }
    
            // We sliced the buffer until no more data could be processed
            // Tell the PipeReader how much we consumed and how much we left to process
            reader.AdvanceTo(buffer.Start, buffer.End);
    
            if (result.IsCompleted)
            {
                break;
            }
        }
    
        reader.Complete();
    }
    

    This solution does have a length prefix buffer, but it's only used if the length prefix is split across spans. There is a SequenceReader<T> coming that I think can make this totally copy-less, though in the case of length prefixes (very few bytes and no buffer allocations), the difference would probably be minimal.