I am writing a server application that will receive data from multiple TCP connections. We would like to be able to scale to ~200 connections. The first algorithm I wrote for this is as follows:
while (keepListening)
{
foreach (TcpClient client in clientList)
{
if (!client.Connected)
{
client.Close();
deleteList.Add(client);
continue;
}
int dataAvail = client.Available;
if (dataAvail > 0)
{
NetworkStream netstr = client.GetStream();
byte[] arry = new byte[dataAvail];
netstr.Read(arry, 0, dataAvail);
MemoryStream ms = new MemoryStream(arry);
try
{
CommData data = dataDeserializer.Deserialize(ms) as CommData;
beaconTable.BeaconReceived(data);
}
catch
{ }
}
}
foreach (TcpClient clientToDelete in deleteList)
clientList.Remove(clientToDelete);
deleteList.Clear();
while (connectionListener.Pending())
clientList.Add(connectionListener.AcceptTcpClient());
Thread.Sleep(20);
}
This works fine, though I found that I have to add the Thread.Sleep to slow down the loop, otherwise it takes up an entire core, no matter how many or few connections there are. I was advised that Thread.Sleep is generally considered bad, so I looked for some alternatives. In a similar question to this, I was recommended to use BeginRead and BeginAccept using WaitHandles, so I wrote up an algorithm to do the same thing using that, and came up with this:
while (keepListening)
{
int waitResult = WaitHandle.WaitAny(waitList.Select(t => t.AsyncHandle.AsyncWaitHandle).ToArray(), connectionTimeout);
if (waitResult == WaitHandle.WaitTimeout)
continue;
WaitObject waitObject = waitList[waitResult];
Type waitType = waitObject.WaitingObject.GetType();
if (waitType == typeof(TcpListener))
{
TcpClient newClient = (waitObject.WaitingObject as TcpListener).EndAcceptTcpClient(waitObject.AsyncHandle);
waitList.Remove(waitObject);
byte[] newBuffer = new byte[bufferSize];
waitList.Add(new WaitObject(newClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), newClient, newBuffer));
if (waitList.Count < 64)
waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null));
else
{
connectionListener.Stop();
listening = false;
}
}
else if (waitType == typeof(TcpClient))
{
TcpClient currentClient = waitObject.WaitingObject as TcpClient;
int bytesRead = currentClient.GetStream().EndRead(waitObject.AsyncHandle);
if (bytesRead > 0)
{
MemoryStream ms = new MemoryStream(waitObject.DataBuffer, 0, bytesRead);
try
{
CommData data = dataDeserializer.Deserialize(ms) as CommData;
beaconTable.BeaconReceived(data);
}
catch
{ }
byte[] newBuffer = new byte[bufferSize];
waitList.Add(new WaitObject(currentClient.GetStream().BeginRead(newBuffer, 0, bufferSize, null, null), currentClient, newBuffer));
}
else
{
currentClient.Close();
}
waitList.Remove(waitObject);
if (!listening && waitList.Count < 64)
{
listening = true;
connectionListener.Start();
waitList.Add(new WaitObject(connectionListener.BeginAcceptTcpClient(null, null), connectionListener, null));
}
}
else
throw new ApplicationException("An unknown type ended up in the wait list somehow: " + waitType.ToString());
}
This also works fine, until I hit 64 clients. I wrote in a limit to not accept more than 64 clients because that's the maximum number of WaitHandles that WaitAny will accept. I can't see any good way around this limit, so I basically can't maintain more than 64 connections like this. The Thread.Sleep algorithm works fine with 100+ connections.
I also don't really like having to pre-allocate a receive array of arbitrary size rather than allocating it at the exact size of the received data after the data is received. And I have to give WaitAny a timeout anyways, or else it prevents the thread running this from Join-ing when I close the application if there are no connections. And it's just generally longer and more complex.
So why is Thread.Sleep the worse solution? Is there some way I can at least get the WaitAny version to handle more than 64 connections? Is there some completely different way of handling this that I'm not seeing?
Jim gave the obvious suggestion of using Async callbacks instead of WaitHandles. I initially thought that this would be too complex, but it got a lot simpler once I realized that I could pass a reference to the calling TcpListener or TcpClient in the state object. With that and a few changes for thread-safety, it's ready to go. It tests fine with over 100 connections and has no problem with exiting cleanly. I would still like an alternative, though, to having to pre-allocate the data buffer. Here's the code for anyone trying something similar:
public class NetworkReceiver : IDisposable
{
private IReceiver beaconTable;
private XmlSerializer dataDeserializer;
private HashSet<TcpClient> ClientTable;
private TcpListener connectionListener;
private int bufferSize = 1000;
public NetworkReceiver(IReceiver inputTable)
{
beaconTable = inputTable;
dataDeserializer = new XmlSerializer(typeof(CommData));
ClientTable = new HashSet<TcpClient>();
connectionListener = new TcpListener(IPAddress.Any, SharedData.connectionPort);
connectionListener.Start();
connectionListener.BeginAcceptTcpClient(ListenerCallback, connectionListener);
}
private void ListenerCallback(IAsyncResult callbackResult)
{
TcpListener listener = callbackResult.AsyncState as TcpListener;
TcpClient client;
try
{
client = listener.EndAcceptTcpClient(callbackResult);
lock (ClientTable)
ClientTable.Add(client);
ClientObject clientObj = new ClientObject() { AsyncClient = client, Buffer = new byte[bufferSize] };
client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj);
listener.BeginAcceptTcpClient(ListenerCallback, listener);
}
catch (ObjectDisposedException)
{
return;
}
}
private void ClientReadCallback(IAsyncResult callbackResult)
{
ClientObject clientObj = callbackResult.AsyncState as ClientObject;
TcpClient client = clientObj.AsyncClient;
if (!client.Connected)
return;
try
{
int bytesRead = client.GetStream().EndRead(callbackResult);
if (bytesRead > 0)
{
MemoryStream ms = new MemoryStream(clientObj.Buffer, 0, bytesRead);
try
{
CommData data;
lock (dataDeserializer)
data = dataDeserializer.Deserialize(ms) as CommData;
lock (beaconTable)
beaconTable.BeaconReceived(data);
}
catch
{ }
client.GetStream().BeginRead(clientObj.Buffer, 0, bufferSize, ClientReadCallback, clientObj);
}
else
{
client.Close();
lock (ClientTable)
ClientTable.Remove(client);
}
}
catch (Exception ex)
{
if (ex.GetType() == typeof(ObjectDisposedException) || ex.GetType() == typeof(InvalidOperationException))
return;
else
throw;
}
}
class ClientObject
{
public TcpClient AsyncClient;
public byte[] Buffer;
}
public void Dispose()
{
connectionListener.Stop();
foreach (TcpClient client in ClientTable)
client.Close();
}
}