Search code examples
c#socketsasynchronoustcpclient

Read asynchronously data from NetworkStream with huge amount of packets


In my application every packet has 2 bytes length on the start. However after some time application starts receiving length less than zero. In synchronous client everything works correctly, but it's too slow. I'm 100% sure in Server everything is correct.

Connect:

    public void Connect(IPAddress ip, int port)
    {
        tcpClient.Connect(ip, port);
        stream = tcpClient.GetStream();
        byte[] len_buffer = new byte[2];
        stream.BeginRead(len_buffer, 0, len_buffer.Length, OnDataRead, len_buffer);
    }

OnDataRead:

    private void OnDataRead(IAsyncResult ar)
    {
            byte[] len = ar.AsyncState as byte[];
            int length = BitConverter.ToInt16(len, 0);
            byte[] buffer = new byte[length];

            int remaining = length;
            int pos = 0;
            while (remaining != 0)
            {
                int add = stream.Read(buffer, pos, remaining);
                pos += add;
                remaining -= add;
            }
            Process(buffer);
            len = new byte[2];

            stream.EndRead(ar);
            stream.BeginRead(len, 0, len.Length, OnDataRead, len);
    }

Solution

  • As I can see, you're mixing up synchronious and asynchronious. That's a bad practice.

    What you want is something like:

    var header = ReadHeader(); // 2 bytes
    var data = ReadData(header.DataSize);
    

    I didn't use the network stream, but.... Here's an example of my async SocketReader:

    public static class SocketReader
    {
        // This method will continues read until count bytes are read. (or socket is closed)
        private static void DoReadFromSocket(Socket socket, int bytesRead, int count, byte[] buffer, Action<ArraySegment<byte>> endRead)
        {
            // Start a BeginReceive.
            try
            {
                socket.BeginReceive(buffer, bytesRead, count - bytesRead, SocketFlags.None, (asyncResult) =>
                {
                    // Get the bytes read.
                    int read = 0;
                    try
                    {
                        // if this goes wrong, the read remains 0
                        read = socket.EndReceive(asyncResult);
                    }
                    catch (ObjectDisposedException) { }
                    catch (Exception exception)
                    {
                        Trace.TraceError(exception.Message);
                    }
    
    
                    // if zero bytes received, the socket isn't available anymore.
                    if (read == 0)
                    {
                        endRead(new ArraySegment<byte>(buffer, 0, 0));
                        return;
                    }
    
                    // increase the bytesRead, (position within the buffer)
                    bytesRead += read;
    
                    // if all bytes are read, call the endRead with the buffer.
                    if (bytesRead == count)
                        // All bytes are read. Invoke callback.
                        endRead(new ArraySegment<byte>(buffer, 0, count));
                    else
                        // if not all bytes received, start another BeginReceive.
                        DoReadFromSocket(socket, bytesRead, count, buffer, endRead);
    
                }, null);
            }
            catch (Exception exception)
            {
                Trace.TraceError(exception.Message);
                endRead(new ArraySegment<byte>(buffer, 0, 0));
            }
        }
    
        public static void ReadFromSocket(Socket socket, int count, Action<ArraySegment<byte>> endRead)
        {
            // read from socket, construct a new buffer.
            DoReadFromSocket(socket, 0, count, new byte[count], endRead);
        }
    
        public static void ReadFromSocket(Socket socket, int count, byte[] buffer, Action<ArraySegment<byte>> endRead)
        {
            // if you do have a buffer available, you can pass that one. (this way you do not construct new buffers for receiving and able to reuse buffers)
    
            // if the buffer is too small, raise an exception, the caller should check the count and size of the buffer.
            if (count > buffer.Length)
                throw new ArgumentOutOfRangeException(nameof(count));
    
            DoReadFromSocket(socket, 0, count, buffer, endRead);
        }
    }
    

    Usage:

    SocketReader.ReadFromSocket(socket, 2, (headerData) =>
    {
        if(headerData.Count == 0)
        {
            // nothing/closed
            return;
        }
    
        // Read the length of the data.
        int length = BitConverter.ToInt16(headerData.Array, headerData.Offset);
    
        SocketReader.ReadFromSocket(socket, length, (dataBufferSegment) =>
        {
            if(dataBufferSegment.Count == 0)
            {
                // nothing/closed
                return;
            }
    
            Process(dataBufferSegment);
    
            // extra: if you need a binaryreader..
            using(var stream = new MemoryStream(dataBufferSegment.Array, dataBufferSegment.Offset, dataBufferSegment.Count))
            using(var reader = new BinaryReader(stream))
            {
                var whatever = reader.ReadInt32();
            }
        }
    });
    

    You can optimize the receive buffer by passing a buffer (look at the overloads)


    Continues receiving: (reusing receivebuffer)

    public class PacketReader
    {
        private byte[] _receiveBuffer = new byte[2];
    
        // This will run until the socket is closed.    
        public void StartReceiving(Socket socket, Action<ArraySegment<byte>> process)
        {
            SocketReader.ReadFromSocket(socket, 2, _receiveBuffer, (headerData) =>
            {
                if(headerData.Count == 0)
                {
                    // nothing/closed
                    return;
                }
    
                // Read the length of the data.
                int length = BitConverter.ToInt16(headerData.Array, headerData.Offset);
    
                // if the receive buffer is too small, reallocate it.
                if(_receiveBuffer.Length < length)
                    _receiveBuffer = new byte[length];
    
                SocketReader.ReadFromSocket(socket, length, _receiveBuffer, (dataBufferSegment) =>
                {
                    if(dataBufferSegment.Count == 0)
                    {
                        // nothing/closed
                        return;
                    }
    
                    try
                    {
                        process(dataBufferSegment);
                    }
                    catch { }
    
                    StartReceiving(socket, process);
                });
            }); 
        }
    }
    

    Usage:

    private PacketReader _reader;
    
    public void Start()
    {
        _reader = new PacketReader(socket, HandlePacket);
    }
    
    private void HandlePacket(ArraySegment<byte> packet)
    {
        // do stuff.....
    }