Search code examples
c#.netasynchronousasync-awaitsystem.threading.channels

.NET Problem using System.Threading.Channels.Channel efficiently


Consider the following code:

var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions());

var t1 = Task.Run(async () =>
{
    DateTime start = DateTime.Now;

    for (int i = 0; i < 100000000; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    Console.WriteLine($"Writer took {DateTime.Now - start}");
    channel.Writer.Complete();
});

var t2 = Task.Run(async () =>
{
    while (true)
    {
        try
        {
            int r = await channel.Reader.ReadAsync();
        }
        catch (ChannelClosedException) { break; }
    }
});

await Task.WhenAll(t1, t2);

This takes about 10 seconds i.e. outputs something like "Writer took 00:00:10.276747". If I comment out the whole while block, it takes about 6 seconds. This is pretty consistent over multiple runs.

Question: if Channel is supposed to be an efficient producer/consumer mechanism, why does consuming in this case affect the producer?

More curiously, if I add these two methods:

static async Task Produce(Channel<int> channel)
{
    DateTime start = DateTime.Now;

    for (int i = 0; i < 100000000; i++)
    {
        await channel.Writer.WriteAsync(i);
    }

    Console.WriteLine($"Writer took {DateTime.Now - start}");
    channel.Writer.Complete();
}

static async Task Consume(Channel<int> channel)
{
    while (true)
    {
        try
        {
            int r = await channel.Reader.ReadAsync();
        }
        catch (ChannelClosedException) { break; }
    }
}

and then do:

var t1 = Produce(channel);
var t2 = Consume(channel);
await Task.WhenAll(t1, t2);

They finish in around 6 seconds either way (while block uncommented vs commented).

Question: Why does involving an explicit thread with Task.Run affect the efficiency?


Solution

  • This is an interesting question but not because of any lack of efficiency. In fact, the question's numbers shows channels are very efficient. Writing to an unbounded channel involves:

    1. Writing to an internal ConcurrentQueue and
    2. Waking one of many possible readers to notify.

    This means that enqueuing and waking a reader only takes 66% more than simply enqueueing into a ConcurrentQueue. That's not bad at all. Unfortunately, that number is deceptive, especially in this case, where a Task or ValueTask is larger than the int payload and the "work" is negligible.

    Benchmark libraries like BenchmarkDotNet run tests multiple times until they can get a statistically stable sample, with warmup and cooldown steps to account for JIT, caching and warmup effects.

    To get a baseline, I used BenchmarkDotnet with this benchmark class. I couldn't resist adding a parameter for the SingleReader optimization which assumes there can be only a single reader at a time, so uses a simpler queue and locking.

    [MemoryDiagnoser]
    [ThreadingDiagnoser]
    public class QuestionBenchmarks
    {
        [Params(true, false)] // Arguments can be combined with Params
        public bool SingleReader;
    
        static async Task Produce(Channel<int> channel)
        {
            DateTime start = DateTime.Now;
    
            for (int i = 0; i < 100000000; i++)
            {
                await channel.Writer.WriteAsync(i);
            }
            channel.Writer.Complete();
        }
    
        static async Task Consume(Channel<int> channel)
        {
            while (true)
            {
                try
                {
                    int r = await channel.Reader.ReadAsync();
                }
                catch (ChannelClosedException) { break; }
            }
        }
    
        [Benchmark]
        public async Task InlinedBoth()
        {
            var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });
    
            var t1 = Task.Run(async () =>
            {
                DateTime start = DateTime.Now;
    
                for (int i = 0; i < 100000000; i++)
                {
                    await channel.Writer.WriteAsync(i);
                }
    
                channel.Writer.Complete();
            });
    
            var t2 = Task.Run(async () =>
            {
                while (true)
                {
                    try
                    {
                        int r = await channel.Reader.ReadAsync();
                    }
                    catch (ChannelClosedException) { break; }
                }
            });
    
            await Task.WhenAll(t1, t2);
        }
    
       
        [Benchmark]
        public async Task InlinedProduceOnly()
        {
            var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });
    
            var t1 = Task.Run(async () =>
            {
                DateTime start = DateTime.Now;
    
                for (int i = 0; i < 100000000; i++)
                {
                    await channel.Writer.WriteAsync(i);
                }
    
                channel.Writer.Complete();
            });
    
            await t1;
        }
    
    
        [Benchmark]
        public async Task WithMethods()
        {
            var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });
    
            var producer = Produce(channel);
            var consumer = Consume(channel);
    
            await Task.WhenAll(producer, consumer);
        }
    
        [Benchmark]
        public async Task WithMethodsProduceOnly()
        {
            var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleReader = SingleReader });
    
            var producer = Produce(channel);
    
            await producer;
        }
    
    }
    

    And got a big surprise, that should have been expected:

    // * Summary *
    
    BenchmarkDotNet=v0.13.1, OS=Windows 10.0.22621
    Intel Core i7-10850H CPU 2.70GHz, 1 CPU, 12 logical and 6 physical cores
    .NET SDK=7.0.100-preview.5.22307.18
      [Host]     : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT
      DefaultJob : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT
    

    With values

    Method SingleReader Mean Error StdDev Completed Work Items Lock Contentions Gen 0 Gen 1 Gen 2 Allocated
    InlinedBoth False 4.193 s 0.0825 s 0.0772 s 9675.0000 32071.0000 - - - 265 KB
    InlinedProduceOnly False 3.842 s 0.0768 s 0.1654 s 1.0000 - 4000.0000 4000.0000 4000.0000 786,464 KB
    WithMethods False 6.181 s 0.1233 s 0.2027 s - - 4000.0000 4000.0000 4000.0000 786,463 KB
    WithMethodsProduceOnly False 3.805 s 0.0753 s 0.0837 s - - 4000.0000 4000.0000 4000.0000 786,462 KB
    InlinedBoth True 4.342 s 0.1200 s 0.3483 s 84484.0000 22848.0000 - - - 71 KB
    InlinedProduceOnly True 2.990 s 0.0595 s 0.0908 s 1.0000 - 3000.0000 3000.0000 3000.0000 393,230 KB
    WithMethods True 4.158 s 0.0814 s 0.1426 s - - 3000.0000 3000.0000 3000.0000 393,230 KB
    WithMethodsProduceOnly True 2.879 s 0.0547 s 0.0512 s - - 3000.0000 3000.0000 3000.0000 393,232 KB

    Completed Work Items is the number of tasks completed in the ThreadPool. The benchmarks with methods don't use the ThreadPool at all. Which of course they don't since they don't use Task.Run! The code that uses methods doesn't use multiple threads so there are no lock conflicts. Same with the code that has no producers.

    This means the benchmarks can't be compared. Even so, it's obvious that using SingleReader uses less memory

    The entire benchmark with the 100M items took 28 minutes, so I'll wait for a bit before creating a new, correct benchmark with far fewer items

    Global total time: 00:28:23 (1703.72 sec), executed benchmarks: 10