Search code examples
c#socketsudpmemorystreambinarywriter

Reusing Streams in a UDP Server


I'm writing a UDP server and am wanting to reduce the number of allocations that are taking place per incoming and outgoing datagram packets. One of the things I'd like to do is re-use the Streams I'm allocating to read and write my datagram packets.

I have this method that uses an instance of a BinaryReader when receiving packets to read the buffer onto an instance object.

// Simplified version of the receive data method.
private void ReceiveClientData(IAsyncResult result)
{
    int receivedData = socket.EndReceiveFrom(result, ref endPoint);
    if (receivedData == 0)
    {
        this.ListenForData(socket);
        return;
    }

    // Read the header in from the buffer first so we know what kind of message and how to route.
    this.binaryReader.BaseStream.SetLength(receivedData);
    this.binaryReader.BaseStream.Seek (0, SeekOrigin.Begin);

    // Can't reset the MemoryStream buffer to the new packet buffer.
    // How does this get consumed by the stream without allocating a new MemoryStream(buffer)?
    byte[] buffer = (byte[])result.AsyncState;

    // Deserialize the bytes into the header and datagram.
    IClientHeader header = new ClientHeader();
    header.GetMessageFromBytes(this.binaryReader);
    IClientDatagram datagram = this.datagramFactory.CreateDatagramFromClientHeader(header);
    datagram.GetMessageFromBytes(this.binaryReader);

    this.ListenForData(socket);
}

Is it safe to re-use the same BinaryReader and underlying MemoryStream providing I Seek back to the beginning of the stream? After doing some reading it would seem that the Socket won't read or write packets concurrently, which would require me to use a Stream per call to EndReceiveFrom. It does sound like you can perform each operation concurrently, such as read + write, but not concurrent reads or concurrent writes. Because of this, I would think I could re-use the reader but I'm not sure if I can. In order to do so, I'd have to give the MemoryStream a new buffer to read from. Can you do that or am I required to create a new MemoryBuffer for each new packet received from the socket?

I'd like to take this same approach with sending packets back to the client. In order to do that I've tried setting the length of my MemoryStream to 0 and seeking back to the beginning. That doesn't appear to clear the buffer, as described in another answer here. I still see the old buffer data in t below.

public void SendMessage(IServerDatagram message)
{
    this.binaryStream.Seek(0, SeekOrigin.Begin);
    this.binaryStream.SetLength (0);

    // Contains old buffer data.
    var t = this.binaryStream.GetBuffer ();
    message.Serialize (this.binaryWriter);

    // Contains a mixture of new bytes + old bytes.
    byte[] data = this.binaryStream.GetBuffer();
    this.binaryWriter.Flush ();

    // Send the datagram packet.
    this.udpServerSocket.SendTo(data, this.connectedClients.FirstOrDefault().Key);
}

I don't know ahead of time how large the buffer needs to be, so I can't SetLength to the size of my new buffer. Is there no other way re-use the Stream without having to instance a new one for every message? If I'm sending thousands of messages a second that could cause some memory pressure wouldn't it?


Solution

  • I experienced related problems using streams and readers and network stuff - and opted to take a somewhat different approach. I studied the BinaryReader, BinaryWriter and BitConverter implementations and wrote extension methods to read and write data directly on the underlying byte[] buffer. Mild PITA, but I don't have streams and readers or writers any more.

    Here, for example, is an int-writing extension method:

    [System.Security.SecuritySafeCritical]
    public unsafe static int Write( this byte[ ] array, int value, int offset = 0 )
    {
      if ( offset + sizeof( int ) > array.Length ) throw new IndexOutOfRangeException( );
      fixed ( byte* b = array )
      {
        *( ( int* ) ( b + offset ) ) = value;
      }
      return sizeof( int );
    }
    

    The return might look dopey, but all the extension methods return number of bytes I've "moved" into the byte array...and so I can consistently know where I'm at; pseudo-stream, if you will. Also, the writing code doesn't always know what it's writing. There are identical overrides for each simple type, one for strings, and one for byte[].

    My larger concern was on the client side where I have a more constrained operating environment...and I maintain a long-lived read buffer. The UdpClient.Receive method produces new byte arrays for every read and that just killed me. GC gone mad...which, of course is very disruptive to UDP streaming :-) So, I found the Receive and ReceiveAsync implementations, and discovered that I could control what the underlying socked needed to be fed and again, made my own RecieveBroadcastToBuffer extension method:

    const int MaxUdpSize = 0x0000ffff;
    const int AnyPort = 0;
    static EndPoint anyV4Endpoint = new IPEndPoint( IPAddress.Any, AnyPort );
    static EndPoint anyV6Endpoint = new IPEndPoint( IPAddress.IPv6Any, AnyPort );
    
    /// <summary>Receives a UDP datagram into the specified buffer at the specified offset</summary>
    /// <returns>The length of the received data</returns>
    public static int ReceiveBroadcastToBuffer( this UdpClient client, byte[ ] buffer, int offset = 0 )
    {
      int received;
      var socket = client.Client;
      if ( socket.AddressFamily == AddressFamily.InterNetwork )
      {
        received = socket.ReceiveFrom( buffer, offset, MaxUdpSize, SocketFlags.None, ref anyV4Endpoint );
      }
      else
      {
        received = socket.ReceiveFrom( buffer, offset, MaxUdpSize, SocketFlags.None, ref anyV6Endpoint );
      }
      return received;
    }
    

    This is almost exactly what the UdpClient.Receive does...except that I govern the buffer. I found that the UdpClient uses a single buffer to read, and I just use a plain ol' Socket on the send-side.