I have the below code:
var channel = Channel.CreateUnbounded<string>();
var consumers = Enumerable
.Range(1, 5)
.Select(consumerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out var item))
{
Console.WriteLine($"Consuming {item} on consumer {consumerNumber}");
}
}
}));
var producers = Enumerable
.Range(1, 5)
.Select(producerNumber =>
Task.Run(async () =>
{
var rnd = new Random();
for (var i = 0; i < 10; i++)
{
var t = $"Message {i}";
Console.WriteLine($"Producing {t} on producer {producerNumber}");
await channel.Writer.WriteAsync(t);
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
}));
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
await Task.WhenAll(consumers);
Which works as it should however im wanting it to consume at the same time as producing. However
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete());
Blocks the consumer from running until its complete and I can't think of a way of getting them both to run?
The consumers
and producers
variables are of type IEnumerable<Task>
. This a deferred enumerable, that needs to be materialized in order for the tasks to be created. You can materialize the enumerable by chaining the ToArray
operator on the LINQ queries. By doing so, the type of the two variables will become Task[]
, which means that your tasks are instantiated and up and running.
As a side note, the ContinueWith
method requires passing explicitly the TaskScheduler.Default
as an argument, otherwise you are at the mercy of whatever the TaskScheduler.Current
may be (it might be the UI TaskScheduler
for example). This is the correct usage of ContinueWith
:
await Task.WhenAll(producers)
.ContinueWith(_ => channel.Writer.Complete(), TaskScheduler.Default);
TaskScheduler
Another problem is that any exceptions thrown by the producers
will be swallowed, because the tasks are not awaited. Only the continuation is awaited, which is unlikely to fail. To solve this problem, you could just ditch the primitive ContinueWith
, and instead use async-await composition (an async local function that awaits the producers
and then completes the channel). In this case not even that is necessary. You could simply do this:
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
The channel will Complete
after any outcome of the Task.WhenAll(producers)
task, and so the consumers
will not get stuck.
A third problem is that a failure of some of the producers
will cause the immediate termination of the current method, before awaiting the consumers
. These tasks will then become fire-and-forget tasks. I am leaving it to you to find how you can ensure that all tasks can be awaited, in all cases, before exiting the method either successfully or with an error.