I am experimenting with System.Threading.Channels
. And I noticed that throttling feature did not work as documented. Here is a very simple example:
public class Example
{
private readonly Channel<int> queue = Channel.CreateBounded<int>(1);
public void Produce()
{
ChannelWriter<int> writer = queue.Writer;
Console.WriteLine("Writing first value...");
writer.WriteAsync(7);
Console.WriteLine("Writing second value...");
writer.WriteAsync(13);
Console.WriteLine("Finishing writing...");
writer.Complete();
}
public async Task Consume()
{
await Task.Delay(5000);
ChannelReader<int> reader = queue.Reader;
await foreach (int value in reader.ReadAllAsync())
Console.WriteLine(value);
Console.WriteLine("Done...");
}
}
Here is how I run it:
var item = new Example();
var task1 = Task.Run(() => item.Produce());
var task2 = Task.Run(() => item.Consume());
Task.WaitAll(task1, task2);
And here is what I get as a result:
Writing first value...
Writing second value...
Finishing writing...
7
Done...
Channel's writer is supposed to wait for the first value to be processed. But it looses second value instead. Could you please help me understand what am I missing?
Console.WriteLine("Writing first value...");
writer.WriteAsync(7);
The WriteAsync
adds the item asynchronously, meaning that the item will be added when the bounded channel has space available for it. You are supposed to await
the calls to WriteAsync
, because they return a ValueTask
that will complete when the item is added. In your case this task most likely completed with some kind of exception, because the channel is Complete
before the WriteAsync
operation had a chance to complete successfully. Since you didn't await
the task, the error was not observed.