I observed a strange behavior while experimenting with the Parallel.ForEach
method and the BlockingCollection<T>
class. Apparently calling the two lines below on a separate thread, is enough to cause an ever increasing number of threads in the ThreadPool
:
var queue = new BlockingCollection<int>();
Parallel.ForEach(queue.GetConsumingEnumerable(), _ => { });
The queue
contains no elements. I was expecting that the Parallel.ForEach
loop would be idle, waiting for items to be added in the queue
. Apparently it's not idle, because the ThreadPool.ThreadCount
is increasing by one every second. Here is a minimal demo:
public class Program
{
public static void Main()
{
new Thread(() =>
{
var queue = new BlockingCollection<int>();
Parallel.ForEach(queue.GetConsumingEnumerable(), _ => { });
})
{ IsBackground = true }.Start();
Stopwatch stopwatch = Stopwatch.StartNew();
while (true)
{
Console.WriteLine($"ThreadCount: {ThreadPool.ThreadCount}");
if (stopwatch.ElapsedMilliseconds > 8000) break;
Thread.Sleep(1000);
}
Console.WriteLine("Finished");
}
}
Output:
ThreadCount: 0
ThreadCount: 4
ThreadCount: 5
ThreadCount: 6
ThreadCount: 7
ThreadCount: 8
ThreadCount: 10
ThreadCount: 11
ThreadCount: 12
Finished
Can anyone explain why is this happening, and how to prevent it from happening? Ideally I would like the Parallel.ForEach
to consume at most one ThreadPool
thread, while the queue
is empty.
I am searching for a solution applicable on .NET Core 3.1 and later.
Reason
The Parallel.ForEach method partitions the enumerable first, then it uses TaskReplicator.Run to run the action on each partition.
The actual runner is the method Replica.Execute. In the beginning of the method, because the enumerable is blocked, the replica won't stop, it will create its replica until _remainingConcurrency is 0.
The _remainingConcurrency field is initialed from the ParallelOptions.EffectiveMaxConcurrencyLevel option, its default value is -1, which means unlimited. So you saw the ThreadCount kept increasing.
Solution
Because we know the reason, we just need to specific a reasonable option to limit the iteration count.
var option = new ParallelOptions
{
MaxDegreeOfParallelism = 5
};
Parallel.ForEach(queue.GetConsumingEnumerable(), option, _ => { });