Following this post, I have been playing with System.Threading.Channel
to get confident enough and use it in my production code, replacing the Threads/Monitor.Pulse/Wait
based approach I am currently using (described in the referred post).
Basically I created a sample with a bounded channel where I run a couple of producer tasks at the beginning and, without waiting, start my consumer tasks, which start pushing elements from the channel. After waiting for the producers tasks to complete, I then signal the channel as complete, so the consumer tasks can quit listening to new channel elements.
My channel is a Channel<Action>
, and in each action I increment the count for each given worker in the WorkDistribution
concurrent dictionary, and at the end of the sample I print it so I can check I consumed as many items as I expected, and also how did the channel distributed the actions between the consumers.
For some reason this "Work Distribution footer" is not printing the same number of items as the total items produced by producer tasks.
What am I missing ? Some of the variables present were added for the sole purpose of helping troubleshoot.
Here's the full code:
public class ChannelSolution
{
object LockObject = new object();
Channel<Action<string>> channel;
int ItemsToProduce;
int WorkersCount;
int TotalItemsProduced;
ConcurrentDictionary<string, int> WorkDistribution;
CancellationToken Ct;
public ChannelSolution(int workersCount, int itemsToProduce, int maxAllowedItems,
CancellationToken ct)
{
WorkersCount = workersCount;
ItemsToProduce = itemsToProduce;
channel = Channel.CreateBounded<Action<string>>(maxAllowedItems);
Console.WriteLine($"Created channel with max {maxAllowedItems} items");
WorkDistribution = new ConcurrentDictionary<string, int>();
Ct = ct;
}
async Task ProduceItems(int cycle)
{
for (var i = 0; i < ItemsToProduce; i++)
{
var index = i + 1 + (ItemsToProduce * cycle);
bool queueHasRoom;
var stopwatch = new Stopwatch();
stopwatch.Start();
do
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting read loop - cancellation requested !");
break;
}
queueHasRoom = await channel.Writer.WaitToWriteAsync();
if (!queueHasRoom)
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting read loop - cancellation"
+ " requested !");
break;
}
if (stopwatch.Elapsed.Seconds % 3 == 0)
Console.WriteLine("Channel reached maximum capacity..."
+ " producer waiting for items to be freed...");
}
}
while (!queueHasRoom);
channel.Writer.TryWrite((workerName) => action($"A{index}", workerName));
Console.WriteLine($"Channel has room, item {index} added"
+ $" - channel items count: [{channel.Reader.Count}]");
Interlocked.Increment(ref TotalItemsProduced);
}
}
List<Task> GetConsumers()
{
var tasks = new List<Task>();
for (var i = 0; i < WorkersCount; i++)
{
var workerName = $"W{(i + 1).ToString("00")}";
tasks.Add(Task.Run(async () =>
{
while (await channel.Reader.WaitToReadAsync())
{
if (Ct.IsCancellationRequested)
{
Console.WriteLine("exiting write loop - cancellation"
+ "requested !");
break;
}
if (channel.Reader.TryRead(out var action))
{
Console.WriteLine($"dequed action in worker [{workerName}]");
action(workerName);
}
}
}));
}
return tasks;
}
void action(string actionNumber, string workerName)
{
Console.WriteLine($"processing {actionNumber} in worker {workerName}...");
var secondsToWait = new Random().Next(2, 5);
Thread.Sleep(TimeSpan.FromSeconds(secondsToWait));
Console.WriteLine($"action {actionNumber} completed by worker {workerName}"
+ $" after {secondsToWait} secs! channel items left:"
+ $" [{channel.Reader.Count}]");
if (WorkDistribution.ContainsKey(workerName))
{
lock (LockObject)
{
WorkDistribution[workerName]++;
}
}
else
{
var succeeded = WorkDistribution.TryAdd(workerName, 1);
if (!succeeded)
{
Console.WriteLine($"!!! failed incremeting dic value !!!");
}
}
}
public void Summarize(Stopwatch stopwatch)
{
Console.WriteLine("--------------------------- Thread Work Distribution "
+ "------------------------");
foreach (var kv in this.WorkDistribution)
Console.WriteLine($"thread: {kv.Key} items consumed: {kv.Value}");
Console.WriteLine($"Total actions consumed: "
+ $"{WorkDistribution.Sum(w => w.Value)} - Elapsed time: "
+ $"{stopwatch.Elapsed.Seconds} secs");
}
public void Run(int producerCycles)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var producerTasks = new List<Task>();
Console.WriteLine($"Started running at {DateTime.Now}...");
for (var i = 0; i < producerCycles; i++)
{
producerTasks.Add(ProduceItems(i));
}
var consumerTasks = GetConsumers();
Task.WaitAll(producerTasks.ToArray());
Console.WriteLine($"-------------- Completed waiting for PRODUCERS -"
+ " total items produced: [{TotalItemsProduced}] ------------------");
channel.Writer.Complete(); //just so I can complete this demo
Task.WaitAll(consumerTasks.ToArray());
Console.WriteLine("----------------- Completed waiting for CONSUMERS "
+ "------------------");
//Task.WaitAll(GetConsumers().Union(producerTasks/*.Union(
// new List<Task> { taskKey })*/).ToArray());
//Console.WriteLine("Completed waiting for tasks");
Summarize(stopwatch);
}
}
And here is the calling code in Program.cs
var workersCount = 5;
var itemsToProduce = 10;
var maxItemsInQueue = 5;
var cts = new CancellationTokenSource();
var producerConsumerTests = new ProducerConsumerTests(workersCount, itemsToProduce,
maxItemsInQueue, cts.Token);
producerConsumerTests.Run(2);
From a quick look there is a race condition in the ProduceItems
method, around the queueHasRoom
variable. You don't need this variable. The channel.Writer.TryWrite
method will tell you whether there is room in the channel's buffer or not. Alternatively you could simply await
the WriteAsync
method, instead of using the WaitToWriteAsync
/TryWrite
combo. AFAIK this combo is intended as a performance optimization of the former method. If you absolutely need to know whether there is available space before attempting to post a value, then the Channel<T>
is probably not a suitable container for your use case. You'll need to find something that can be locked during the whole operation of "check-for-available-space -> create-the-value -> post-the-value", so that this operation can be made atomic.
As a side note, using a lock to protect the updating of the ConcurrentDictionary
is redundant. The ConcurrentDictionary
offers the AddOrUpdate
method, that can replace atomically a value it contains with another value. You may had to lock if the dictionary contained mutable objects, and you needed to mutate that objects with thread-safety. But in your case the values are of type Int32
, which is an immutable struct. You don't change it, you just replace it with a new Int32
, which is created based on the existing value:
WorkDistribution.AddOrUpdate(workerName, 1, (_, existing) => existing + 1);