Search code examples
c#multithreadingperformancenetwork-programmingblockingcollection

Scaling Connections with BlockingCollection<T>()


I have a server which communicates with 50 or more devices over TCP LAN. There is a Task.Run for each socket reading message loop.

I buffer each message reach into a blocking queue, where each blocking queue has a Task.Run using a BlockingCollection.Take().

So something like (semi-pseudocode):

Socket Reading Task

Task.Run(() =>
{
    while (notCancelled)
    {
        element = ReadXml();
        switch (element)
        {
            case messageheader:
                MessageBlockingQueue.Add(deserialze<messageType>());
            ...
        }
    }
});

Message Buffer Task

Task.Run(() =>
{
    while (notCancelled)
    {
        Process(MessageQueue.Take());
    }
});

So that would make 50+ reading tasks and 50+ tasks blocking on their own buffers.

I did it this way to avoid blocking the reading loop and allow the program to distribute processing time on messages more fairly, or so I believe.

Is this an inefficient way to handle it? what would be a better way?


Solution

  • You may be interested in the "channels" work, in particular: System.Threading.Channels. The aim of this is to provider asynchronous producer/consumer queues, covering both single and multiple producer and consumer scenarios, upper limits, etc. By using an asynchronous API, you aren't tying up lots of threads just waiting for something to do.

    Your read loop would become:

    while (notCancelled) {
        var next = await queue.Reader.ReadAsync(optionalCancellationToken);
        Process(next);
    }
    

    and the producer:

    switch (element)
    {
        case messageheader:
            queue.Writer.TryWrite(deserialze<messageType>());
            ...
    }
    

    so: minimal changes


    Alternatively - or in combination - you could look into things like "pipelines" (https://www.nuget.org/packages/System.IO.Pipelines/) - since you're dealing with TCP data, this would be an ideal fit, and is something I've looked at for the custom web-socket server here on Stack Overflow (which deals with huge numbers of connections). Since the API is async throughout, it does a good job of balancing work - and the pipelines API is engineered with typical TCP scenarios in mind, for example partially consuming incoming data streams as you detect frame boundaries. I've written about this usage a lot, with code examples mostly here. Note that "pipelines" doesn't include a direct TCP layer, but the "kestrel" server includes one, or the third-party library https://www.nuget.org/packages/Pipelines.Sockets.Unofficial/ does (disclosure: I wrote it).