Search code examples
c#multithreadingthreadpoolparallel.foreachblockingcollection

Parallel.ForEach on a BlockingCollection causes steady increase of threads


I observed a strange behavior while experimenting with the Parallel.ForEach method and the BlockingCollection<T> class. Apparently calling the two lines below on a separate thread, is enough to cause an ever increasing number of threads in the ThreadPool:

var queue = new BlockingCollection<int>();
Parallel.ForEach(queue.GetConsumingEnumerable(), _ => { });

The queue contains no elements. I was expecting that the Parallel.ForEach loop would be idle, waiting for items to be added in the queue. Apparently it's not idle, because the ThreadPool.ThreadCount is increasing by one every second. Here is a minimal demo:

public class Program
{
    public static void Main()
    {
        new Thread(() =>
        {
            var queue = new BlockingCollection<int>();
            Parallel.ForEach(queue.GetConsumingEnumerable(), _ => { });
        })
        { IsBackground = true }.Start();

        Stopwatch stopwatch = Stopwatch.StartNew();
        while (true)
        {
            Console.WriteLine($"ThreadCount: {ThreadPool.ThreadCount}");
            if (stopwatch.ElapsedMilliseconds > 8000) break;
            Thread.Sleep(1000);
        }
        Console.WriteLine("Finished");
    }
}

Output:

ThreadCount: 0
ThreadCount: 4
ThreadCount: 5
ThreadCount: 6
ThreadCount: 7
ThreadCount: 8
ThreadCount: 10
ThreadCount: 11
ThreadCount: 12
Finished

Online demo.

Can anyone explain why is this happening, and how to prevent it from happening? Ideally I would like the Parallel.ForEach to consume at most one ThreadPool thread, while the queue is empty.

I am searching for a solution applicable on .NET Core 3.1 and later.


Solution

  • Reason

    The Parallel.ForEach method partitions the enumerable first, then it uses TaskReplicator.Run to run the action on each partition.

    The actual runner is the method Replica.Execute. In the beginning of the method, because the enumerable is blocked, the replica won't stop, it will create its replica until _remainingConcurrency is 0.

    The _remainingConcurrency field is initialed from the ParallelOptions.EffectiveMaxConcurrencyLevel option, its default value is -1, which means unlimited. So you saw the ThreadCount kept increasing.

    Solution

    Because we know the reason, we just need to specific a reasonable option to limit the iteration count.

    var option = new ParallelOptions
    {
        MaxDegreeOfParallelism = 5
    };
    Parallel.ForEach(queue.GetConsumingEnumerable(), option, _ => { });