Search code examples
c#async-awaitasyncsocket

NetworkStream AsyncWrite speed


I am learning async socket programming and for a bit harder project I thought of creating a server for group chat. Managed to successfully do it, but I am not sure if performance is good enough and think I am doing something wrong.

Basically, I connect 400 users to the server and then send 1000 messages (message is 1kB, with prefixed length and rest is empty) from one of the users. Server needs to broadcast every message to all 400 users. There is List of NetworkStreams on the server and when server receives a message, it iterates through the list and calls stream.WriteAsync method. However, it seems to take server 40-50ms to send that message to all 400 users. During the test, server CPU usage is at ~4% and StressClient CPU usage is at ~55%. I was expecting it would be way faster than 40-50ms. Am I doing something wrong or is this maximum speed?

Here is server code (last 2 methods are the most relevant, ReceiveMessageAsync and SendToAllAsync)

private List<NetworkStream> connectedUsers = new List<NetworkStream>();
private int processedRequestsAmount = 0;
private Stopwatch sw = new Stopwatch();

public ServerEngine()
{
}

public void Start(IPAddress ipAddress, int port)
{
    TcpListener listener = new TcpListener(ipAddress, port);
    try
    {
        listener.Start();
        AcceptClientsAsync(listener);

        while (true)
        {
            Console.ReadKey(true);
            Console.WriteLine("Processed requests: " + processedRequestsAmount);
        }
    }
    finally
    {
        listener.Stop();
        Console.WriteLine("Server stopped! Press ENTER to close application...");
        Console.ReadLine();
    }
}

private async Task AcceptClientsAsync(TcpListener listener)
{
    while (true)
    {
        try
        {
            TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
            StartClientListenerAsync(client);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }
}

private async Task StartClientListenerAsync(TcpClient client)
{
    using (client)
    {
        var buf = new byte[1024];
        NetworkStream stream = client.GetStream();
        lock (connectedUsers)
        {
            connectedUsers.Add(stream);
        }
        Console.WriteLine(connectedUsers.Count + " users connected!");

        while (true)
        {
            try
            {
                await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
                break;
            }
        }

        connectedUsers.Remove(stream);
        Console.WriteLine("User disconnected.");
    }
}

private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
{
    int totalAmountRead = 0;

    // read header (length, 2 bytes total)
    while (totalAmountRead < 2)
    {
        totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
    }

    short totalLength = BitConverter.ToInt16(readBuffer, 0);

    // read rest of the message
    while (totalAmountRead < totalLength)
    {
        totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
    }

    await SendToAllAsync(readBuffer, totalLength);
}

private async Task SendToAllAsync(byte[] buffer, short totalLength)
{
    List<Task> tasks = new List<Task>(connectedUsers.Count);
    if (processedRequestsAmount == 0)
    {
        sw.Start();
    }

    foreach (NetworkStream stream in connectedUsers)
    {
        tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length)); 
    }

    await Task.WhenAll(tasks).ConfigureAwait(false);

    processedRequestsAmount++;
    if (processedRequestsAmount == 1000)
    {
        sw.Stop();
        Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
    }
}

Solution

  • Turns out when I run the Server and ClientStressTest applications without debugging (ctrl+F5 in Visual Studio), it takes server only 5ms (CPU usage at ~30%) to send message to 400 users which is way better than I ever hoped for. Could someone explain to me why would attaching debugger slow things down so much?

    Anyway, here is rest of the code if someone needs it to figure this out

    ClientStressTest's Program.cs

    class Program
    {
        static int NumOfClients = 400;
        static int NumOfMessages = 1000;
    
        static NetworkStream[] Streams = new NetworkStream[NumOfClients];
        static byte[] Message = new byte[1024];
    
        static void Main(string[] args)
        {
            Buffer.BlockCopy(BitConverter.GetBytes((short)1024), 0, Message, 0, sizeof(short));
            Console.WriteLine("Press ENTER to run setup");
            Console.ReadLine();
    
            Setup().Wait();
    
    
            Console.WriteLine("Press ENTER to start sending");
            Console.ReadLine();
    
    
            NetworkStream sender = Streams[0];
            for (int i = 0; i < NumOfMessages; i++)
            {
                sender.WriteAsync(Message, 0, 1024);
            }
    
            Console.ReadLine();
        }
    
        static async Task Setup()
        {
            for (int i = 0; i < Streams.Length; i++)
            {
                TcpClient tcpClient = new TcpClient();
                tcpClient.Connect("localhost", 4000);
                NetworkStream stream = tcpClient.GetStream();
                Streams[i] = stream;
                Task.Run(() => CallbackListener(stream));
            }
        }
    
        static int counter = 0;
        static object objLock = new object();
        static async Task CallbackListener(NetworkStream stream)
        {
            var readBuffer = new byte[1024];
            int totalAmountRead;
            short totalLength;
    
            while (true)
            {
                totalAmountRead = 0;
                while (totalAmountRead < 2)
                {
                    totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
                }
    
                totalLength = BitConverter.ToInt16(readBuffer, 0);
    
                while (totalAmountRead < totalLength)
                {
                    totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
                }
    
                lock(objLock)
                {
                    counter++;
                    if (counter % 1000 == 0)
                    {
                        // to see progress
                        Console.WriteLine(counter);
                    }
                }
                // do nothing
            }
        }
    }
    

    Server's Program.cs

    class Program
    {
        static void Main(string[] args)
        {
            var server = new ServerEngine();
            server.Start(IPAddress.Any, 4000);
        }
    }
    

    Server's ServerEngine.cs

    public class ServerEngine
    {
        private List<NetworkStream> connectedUsers = new List<NetworkStream>();
        private int processedRequestsAmount = 0;
        private Stopwatch sw = new Stopwatch();
    
        public ServerEngine()
        {
        }
    
        public void Start(IPAddress ipAddress, int port)
        {
            TcpListener listener = new TcpListener(ipAddress, port);
            try
            {
                listener.Start();
                AcceptClientsAsync(listener);
    
                while (true)
                {
                    Console.ReadKey(true);
                    Console.WriteLine("Processed requests: " + processedRequestsAmount);
                }
            }
            finally
            {
                listener.Stop();
                Console.WriteLine("Server stopped! Press ENTER to close application...");
                Console.ReadLine();
            }
        }
    
        private async Task AcceptClientsAsync(TcpListener listener)
        {
            while (true)
            {
                try
                {
                    TcpClient client = await listener.AcceptTcpClientAsync().ConfigureAwait(false);
                    StartClientListenerAsync(client);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    
        private async Task StartClientListenerAsync(TcpClient client)
        {
            using (client)
            {
                var buf = new byte[1024];
                NetworkStream stream = client.GetStream();
                lock (connectedUsers)
                {
                    connectedUsers.Add(stream);
                }
                Console.WriteLine(connectedUsers.Count + " users connected!");
    
                while (true)
                {
                    try
                    {
                        await RecieveMessageAsync(stream, buf).ConfigureAwait(false);
                    }
                    catch (Exception ex)
                    {
                        break;
                    }
                }
    
                connectedUsers.Remove(stream);
                Console.WriteLine("User disconnected.");
            }
        }
    
        private async Task RecieveMessageAsync(NetworkStream stream, byte[] readBuffer)
        {
            int totalAmountRead = 0;
    
            // read header (length, 2 bytes total)
            while (totalAmountRead < 2)
            {
                totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, 2 - totalAmountRead).ConfigureAwait(false);
            }
    
            short totalLength = BitConverter.ToInt16(readBuffer, 0);
    
            // read rest of the message
            while (totalAmountRead < totalLength)
            {
                totalAmountRead += await stream.ReadAsync(readBuffer, totalAmountRead, totalLength - totalAmountRead).ConfigureAwait(false);
            }
    
            await SendToAll(readBuffer, totalLength).ConfigureAwait(false);
        }
    
        private async Task SendToAll(byte[] buffer, short totalLength)
        {
            List<Task> tasks = new List<Task>(connectedUsers.Count);
            if (processedRequestsAmount == 0)
            {
                sw.Start();
            }
    
            foreach (NetworkStream stream in connectedUsers)
            {
                tasks.Add(stream.WriteAsync(buffer, 0, buffer.Length));
            }
    
            await Task.WhenAll(tasks).ConfigureAwait(false);
    
            processedRequestsAmount++;
            if (processedRequestsAmount == 1000)
            {
                sw.Stop();
                Console.WriteLine("Average time for sending 400 messages is {0} ms", sw.Elapsed.TotalMilliseconds / 1000.0);
            }
        }
    }