Search code examples
c#multithreadingasync-awaitservernetworkstream

NetworkStream ReadAsync and Read in the same method


I'm trying to build a scalable server application with persistent TCP connections. The serialization library I use is synchronous and converting it to the APM would cause a big overhead (I already tested it).

The format for the packet is always one byte for the packet's id, followed by a few more header fields and the payload. I was wondering, if I created an async method such as:

public async Task<Packet> Deserialize(NetworkStream stream)
{
    //Omitting parameters for the methods below for simplicity.
    var id = await stream.ReadAsync();
    var size = stream.Read();
    //Read the rest of the fields synchronously and deserialize.
}
  • Will I risk to cause starvation for other sockets if the synchronous Read on one of them takes too much (due to TCP fragmentation for example)?
  • I thought of reading all the bytes for a packet via ReadAsync (the size is the second field in the header) and then deserialize them synchronously - As it's a non-blocking operation - But that forces me to separate the payload deserialization context from the headers' one, which will cause me to write a lot of duplicate code. Is there a more viable solution if the answer to the question above is yes?

Solution

  • Will I risk to cause starvation for other sockets

    If for "socket", we read "thread", the answer is no. Task scheduling is adaptive; if your task is what the scheduler determines to be "long running", more worker threads will be drafted into service as necessary. The thread pool that services tasks is carefully designed to respond to circumstances dynamically.

    The main problem is not that you would run out of threads (I mean, it's possible that you create so many your application stops responding, but very unlikely) but that it throws the whole idea of scalability through asynchronous I/O out the window. This could be worse than just doing the whole thing on a thread if the synchronous part dominates.

    Is there a more viable solution

    The general idea is to decouple deserialization from reading data through buffering. Read some bytes, buffer them, determine if your buffer holds one or more complete packets, remove those from the buffer and deserialize them (leaving some undeserialized data). This requires that you either know the size of the packet without deserializing it at all, or that you can feed a number of bytes to your deserialization logic which then gives you back an error "need more data". This can always be made to work unless the deserializer has side effects.

    The problem is that for example even just the size is not a simple numeric value, and even its size isn't a fixed amount of bytes. I could create an instance of the deserializer exclusively to read the size synchronously (just about 3-4 bytes usually), then read the payload asynchronously and then finally deserialize the latter but that adds quite some pressure on the GC, as well as making the code even more divided.

    Two things here:

    • You don't need to deserialize just the size. Just give the deserializer a whole chunk of data, including the size(s), and see what it spits out. The only way this doesn't work is if it insists that you hand it an exact number of bytes (that is, not "too many"). But since your deserializer is stream-based, I don't see that as a problem.
    • GC pressure should be fairly limited if you construct your MemoryStreams directly on your buffers by using the appropriate constructor that wraps an array segment rather than copy data. Now all you need to worry about is the MemoryStream objects themselves, but you'd typically create only short-lived ones that get cleaned up in generation 1, so no big.

    The general idea is something like this:

    byte[] buffer;
    int offset = 0;
    int bytesRead = await Stream.ReadAsync(buffer, offset, buffer.Length - offset);
    int bytesRemaining = bytesRead;
    while (bytesRemaining != 0 && haveCompletishPacket(buffer, offset, bytesRemaining)) {
        using (var memoryStream = new MemoryStream(buffer, offset, bytesRead)) {
            int size = deserializer.Deserialize(memoryStream);
            // deserialize as much as possible, if you run out of data, 
            // just reinit the deserializer and return
    
            // if we got here we have a packet, produce it
            offset += memoryStream.Position;
            bytesRemaining -= memoryStream.Position;
        }
     }
    

    The details of making sure you are maintaining the buffer correctly are easy to get wrong, so I may have gotten it wrong in the code above. I hope the idea is clear nevertheless.

    Obviously this works best if haveCompletishPacket can tell you with 100% accuracy if there is a complete packet in the buffer before we try the deserializer at all (and that is possible if your packets are always framed with constant-size length types), but it will be "good enough" if it tries its best, as long as we read enough data and packets aren't too big.