Search code examples
c#performanceproducer-consumerconcurrent-queuesystem.threading.channels

When should System.Threading.Channels be preferred to ConcurrentQueue?


I recently built a consumer/producer system using ConcurrentQueue<T> and SemaphoreSlim. Then made another alternative system utilizing the new System.Threading.Channel class.

After benchmarking both systems using BenchmarkDotNet by writing 1000 items 1000 times into both systems (and waiting for reader to finish), I get the following results:

|      Method | ItemsCount | Iterations |        Mean |       Error |      StdDev |      Median |  Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
|     MyQueue |       1000 |       1000 | 19,379.4 us | 1,230.30 us | 3,569.33 us | 18,735.6 us | 8235.02 KB |
|   MyChannel |       1000 |       1000 | 45,858.2 us | 1,298.42 us | 3,704.46 us | 45,689.2 us |   72.11 KB |

The ConcurrentQueue implementation seems to be significantly faster than the Channel.

I tried setting SingleReader and SingleWriter to true on the Channel, but results ended up being worse:

|      Method | ItemsCount | Iterations |        Mean |       Error |      StdDev |      Median |  Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
|     MyQueue |       1000 |       1000 | 18,578.7 us | 1,238.46 us | 3,493.10 us | 18,192.7 us | 8236.31 KB |
|   MyChannel |       1000 |       1000 | 50,506.9 us | 1,383.73 us | 3,857.28 us | 49,635.8 us |  170.73 KB |

I'm not sure if there is a flaw in my implementation or the benchmark itself? If not and these results are valid, when should Channels be preferred over just plain ConcurrentQueue?

The simplified code of both classes look like this:

public class MyQueue
{
    ConcurrentQueue<Item> _queue;
    SemaphoreSlim _readerFinishedSemaphore;
    SemaphoreSlim _readSemaphore;

    bool completed = false;

    public void Setup()
    {
        _queue = new();
        _readerFinishedSemaphore = new(0);
        _readSemaphore = new(0);

        var task = new Task(Reader, TaskCreationOptions.LongRunning);
        task.Start();
    }

    private async void Reader()
    {
        while (true)
        {
            await _readSemaphore.WaitAsync();
            while (_queue.TryDequeue(out var item))
            {
                // do stuff ...
            }

            if (_completed) break;
        }

        _readerFinishedSemaphore.Release();
    }

    public void Write(IList<Item> items)
    {
        foreach (var i in items)
        {
            _queue.Enqueue(i);
        }

        _readSemaphore.Release();
    }

    public void CompleteAndWaitForReader()
    {
        _completed = true;
        _readSemaphore.Release();
        _readerFinishedSemaphore.Wait();
    }
}

And for Channels:

public class MyChannel
{
    Channel<Item> _channel = null!;
    SemaphoreSlim _readerFinishedSemaphore = null!;

    public void Setup()
    {
        _readerFinishedSemaphore = new(0);
        _channel = Channel.CreateUnbounded<Item>();

        var task = new Task(Reader, TaskCreationOptions.LongRunning);
        task.Start();
    }

    private async void Reader()
    {
        var reader = _channel.Reader;

        while (await reader.WaitToReadAsync())
        {
            while (reader.TryRead(out var item))
            {
                // do stuff ...
            }
        }

        _readerFinishedSemaphore.Release();
    }

    public void Write(IList<Item> items)
    {
        foreach (var i in items)
        {
            _channel.Writer.TryWrite(i);
        }
    }

    public void CompleteAndWaitForReader()
    {
        _channel.Writer.Complete();
        _readerFinishedSemaphore.Wait();
    }
}

The benchmarking code looks like this:

// items are generated in [GlobalSetup] using fixed-seed Random class

[IterationSetup]
public void IterationSetup()
{
    myChannel = new MyChannel();
    myQueue = new MyQueue();

    myChannel.Setup();
    myQueue.Setup();
}

[Benchmark]
public void MyQueue()
{
    for (int i = 0; i < Iterations; i++)
        myQueue.Write(items);

    myQueue.CompleteAndWaitForReader();
}

// same for MyChannel

Should be noted I am running this on .NET 8.0.0-preview.6.23329.4


Solution

  • The main reason why ConcurrentQueue<T> was performing faster was because it was signalling only once per 1000 added items, while the Channel<T> was doing it for every item.

    When I adjusted the benchmark to instead add 1000 items separately one by one to make it fairer, the results were practically identical:

    |    Method | ItemsCount |     Mean |    Error |   StdDev |   Median | Allocated |
    |---------- |----------- |---------:|---------:|---------:|---------:|----------:|
    |   MyQueue |       1000 | 163.8 us | 22.09 us | 64.44 us | 144.8 us |   8.42 KB |
    | MyChannel |       1000 | 163.2 us | 14.02 us | 41.12 us | 177.9 us |   5.48 KB |
    

    And at higher item counts, the difference was becoming more obvious in favor of the Channel<T> implementation: (also notably in terms of allocations)

    |    Method | ItemsCount |      Mean |     Error |    StdDev |    Median | Allocated |
    |---------- |----------- |----------:|----------:|----------:|----------:|----------:|
    |   MyQueue |      10000 |  1.668 ms | 0.1971 ms | 0.5811 ms |  1.841 ms |  16.67 KB |
    | MyChannel |      10000 |  1.163 ms | 0.1090 ms | 0.3197 ms |  1.121 ms |   9.92 KB |
    |   MyQueue |     100000 | 10.906 ms | 1.1151 ms | 3.1995 ms | 11.850 ms |  65.17 KB |
    | MyChannel |     100000 |  6.678 ms | 0.2506 ms | 0.7026 ms |  6.653 ms |   9.92 KB |
    

    So I guess I will be sticking to Channel<T> for general producer/consumer scenarios.