Search code examples
c#.nettcpclientwaithandle

Waiting for multiple TcpClients to have data available - WaitHandle or Thread.Sleep?


I am writing a server application that will receive data from multiple TCP connections. We would like to be able to scale to ~200 connections. The first algorithm I wrote for this is as follows:

while (keepListening)
{
    foreach (TcpClient client in clientList)
    {
        if (!client.Connected)
        {
            client.Close();
            deleteList.Add(client);
            continue;
        }

        int dataAvail = client.Available;
        if (dataAvail > 0)
        {
            NetworkStream netstr = client.GetStream();
            byte[] arry = new byte[dataAvail];
            netstr.Read(arry, 0, dataAvail);
            MemoryStream ms = new MemoryStream(arry);
            try
            {
                CommData data = dataDeserializer.Deserialize(ms) as CommData;
                beaconTable.BeaconReceived(data);
            }
            catch
            { }
        }
    }

    foreach (TcpClient clientToDelete in deleteList)
        clientList.Remove(clientToDelete);
    deleteList.Clear();

    while (connectionListener.Pending())
        clientList.Add(connectionListener.AcceptTcpClient());

    Thread.Sleep(20);
}

This works fine, though I found that I have to add the Thread.Sleep to slow down the loop, otherwise it takes up an entire core, no matter how many or few connections there are. I was advised that Thread.Sleep is generally considered bad, so I looked for some alternatives. In a similar question to this, I was recommended to use BeginRead and BeginAccept using WaitHandles, so I wrote up an algorithm to do the same thing using that, and came up with this:

while (keepListening)
{
    int waitResult = WaitHandle.WaitAny(waitList.Select(t => t.AsyncHandle.AsyncWaitHandle).ToArray(), connectionTimeout);

    if (waitResult == WaitHandle.WaitTimeout)
        continue;

    WaitObject waitObject = waitList[waitResult];
    Type waitType = waitObject.WaitingObject.GetType();

    if (waitType == typeof(TcpListener))
    {
        TcpClient newClient = (waitObject.WaitingObject as TcpListener).EndAcceptTcpClient(waitObject.AsyncHandle);
        waitList.Remove(waitObject);

        byte[] newBuffer = new byte[bufferSize];
        waitList.Add(new WaitObject(newClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), newClient, newBuffer));

        if (waitList.Count < 64)
            waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null));
        else
        {
            connectionListener.Stop();
            listening = false;
        }
    }
    else if (waitType == typeof(TcpClient))
    {
        TcpClient currentClient = waitObject.WaitingObject as TcpClient;
        int bytesRead = currentClient.GetStream().EndRead(waitObject.AsyncHandle);

        if (bytesRead > 0)
        {
            MemoryStream ms = new MemoryStream(waitObject.DataBuffer, 0, bytesRead);
            try
            {
                CommData data = dataDeserializer.Deserialize(ms) as CommData;
                beaconTable.BeaconReceived(data);
            }
            catch
            { }
            byte[] newBuffer = new byte[bufferSize];
            waitList.Add(new WaitObject(currentClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), currentClient, newBuffer));
        }
        else
        {
            currentClient.Close();
        }

        waitList.Remove(waitObject);

        if (!listening && waitList.Count < 64)
        {
            listening = true;
            connectionListener.Start();
            waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null));
        }
    }
    else
        throw new ApplicationException("An unknown type ended up in the wait list somehow: " + waitType.ToString());
}

This also works fine, until I hit 64 clients. I wrote in a limit to not accept more than 64 clients because that's the maximum number of WaitHandles that WaitAny will accept. I can't see any good way around this limit, so I basically can't maintain more than 64 connections like this. The Thread.Sleep algorithm works fine with 100+ connections.

I also don't really like having to pre-allocate a receive array of arbitrary size rather than allocating it at the exact size of the received data after the data is received. And I have to give WaitAny a timeout anyways, or else it prevents the thread running this from Join-ing when I close the application if there are no connections. And it's just generally longer and more complex.

So why is Thread.Sleep the worse solution? Is there some way I can at least get the WaitAny version to handle more than 64 connections? Is there some completely different way of handling this that I'm not seeing?


Solution

  • Jim gave the obvious suggestion of using Async callbacks instead of WaitHandles. I initially thought that this would be too complex, but it got a lot simpler once I realized that I could pass a reference to the calling TcpListener or TcpClient in the state object. With that and a few changes for thread-safety, it's ready to go. It tests fine with over 100 connections and has no problem with exiting cleanly. I would still like an alternative, though, to having to pre-allocate the data buffer. Here's the code for anyone trying something similar:

    public class NetworkReceiver : IDisposable
    {
        private IReceiver beaconTable;
        private XmlSerializer dataDeserializer;
        private HashSet<TcpClient> ClientTable;
        private TcpListener connectionListener;
        private int bufferSize = 1000;
    
        public NetworkReceiver(IReceiver inputTable)
        {
            beaconTable = inputTable;
            dataDeserializer = new XmlSerializer(typeof(CommData));
    
            ClientTable = new HashSet<TcpClient>();
            connectionListener = new TcpListener(IPAddress.Any, SharedData.connectionPort);
            connectionListener.Start();
            connectionListener.BeginAcceptTcpClient(ListenerCallback, connectionListener);
        }
    
        private void ListenerCallback(IAsyncResult callbackResult)
        {
            TcpListener listener = callbackResult.AsyncState as TcpListener;
            TcpClient client;
    
            try
            {
                client = listener.EndAcceptTcpClient(callbackResult);
    
                lock (ClientTable)
                    ClientTable.Add(client);
    
                ClientObject clientObj = new ClientObject() { AsyncClient = client, Buffer = new byte[bufferSize] };
                client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj);
    
                listener.BeginAcceptTcpClient(ListenerCallback, listener);
            }
            catch (ObjectDisposedException)
            {
                return;
            }
        }
    
        private void ClientReadCallback(IAsyncResult callbackResult)
        {
            ClientObject clientObj = callbackResult.AsyncState as ClientObject;
            TcpClient client = clientObj.AsyncClient;
    
            if (!client.Connected)
                return;
    
            try
            {
                int bytesRead = client.GetStream().EndRead(callbackResult);
                if (bytesRead > 0)
                {
                    MemoryStream ms = new MemoryStream(clientObj.Buffer, 0, bytesRead);
                    try
                    {
                        CommData data;
                        lock (dataDeserializer)
                            data = dataDeserializer.Deserialize(ms) as CommData;
                        lock (beaconTable)
                            beaconTable.BeaconReceived(data);
                    }
                    catch
                    { }
    
                    client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj);
                }
                else
                {
                    client.Close();
                    lock (ClientTable)
                        ClientTable.Remove(client);
                }
            }
            catch (Exception ex)
            {
                if (ex.GetType() == typeof(ObjectDisposedException) || ex.GetType() == typeof(InvalidOperationException))
                    return;
                else
                    throw;
            }
        }
    
        class ClientObject
        {
            public TcpClient AsyncClient;
            public byte[] Buffer;
        }
    
        public void Dispose()
        {
            connectionListener.Stop();
            foreach (TcpClient client in ClientTable)
                client.Close();
        }
    }