Search code examples
c#parallel-processingparallel.foreachconsumerproducer

How to know when to stop a parallel foreach where the consumer is also the producer in C#


I am trying to process some items in a BlockingCollection in parallel using Parallel.ForEach(). When processing an item, it can generate 0-2 more items to process. The number of items to process will always eventually reach 0.

My issue is, as the consumer is also the producer (processing items can generate more items to process), I can't call BlockingCollection's CompleteAdding() when the BlockingCollection is empty as there could be other threads currently processing an item which will generate more items. Therefore I don't know how to let the BlockingCollection/Parallel.ForEach know it can exit.

Here is an example of the situation (modified for simplicity)

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            var process = new BlockingCollection<int>() { 30 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item-1);
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0");
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

I've tried modifying the MaxDegreeOfParallelism during the Parallel.ForEach to the minimum of the number of items to process and Environment.ProcessorCount but that doesn't do anything during the Parallel.ForEach.

I've also tried storing a count of the number of unprocessed items and performing a lock when updating this number on each thread. When the unprocessed items is 0 then I call the AddingCompleted method. This doesn't work either.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            var runningLock = new object();
            int running = 0;

            var process = new BlockingCollection<int>() { 30 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                lock (runningLock)
                {
                    running++;
                }

                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count} | item: {item} | Added: 2 | running: {running}");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item - 1);
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 1 | running: {running}");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"process.Count: {process.Count}| item: {item} | Added: 0 | running: {running}");
                }

                lock (runningLock)
                {
                    running--;

                    if (running == 0 && process.Count == 0)
                    {
                        Console.WriteLine($"Stopping | running: {running} | process.Count: {process.Count}");
                        process.CompleteAdding();
                    }
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

Should I use something else instead of Parallel.ForEach for this?

Also, when setting MaxDegreeOfParallelism to 1. If the initial item of the BlockingCollection is >= 27, it processes everything normally however, if it's <= 26, it stops processing items around 16? Also, higher MaxDegreeOfParallelism results in stopping processing items at a lower number.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace Example
{
    class Example
    {
        static void Main(string[] args)
        {
            // Normal
            var process = new BlockingCollection<int>() { 27 };
            // Stops around 16
            //var process = new BlockingCollection<int>() { 26 };

            var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 1 };

            Parallel.ForEach(process.GetConsumingEnumerable(), parallelOptions, item =>
            {
                if (item > 20)
                {
                    // Some add 2 items
                    process.Add(item - 1);
                    process.Add(item - 1);
                    Console.WriteLine($"Process Size: {process.Count} | Current Num: {item} | Added: 2");
                }
                else if (item > 10)
                {
                    // Some add 1 item
                    process.Add(item - 1);
                    Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 1");
                }
                else
                {
                    // Some add 0 items
                    Console.WriteLine($"Process Size: {process.Count}| Current Num: {item} | Added: 0");
                }
            });

            // Parallel.ForEach never exits
            Console.WriteLine("Completed Processing");

            Console.ReadKey();
        }
    }
}

Here's the actual code if anyone prefers to look at the actual code instead of the abstracted version.


Solution

  • You were on the right track with this:

    I've also tried storing a count of the number of unprocessed items and performing a lock when updating this number on each thread. When the unprocessed items is 0 then I call the AddingCompleted method.

    The problem is that you actually are counting the number of active workers, not the number of unprocessed items. I.e. you only increment your counter when you start processing something, so there may be many other items in the queue not represented by that counter. To do the latter, what you need to do is increment a counter for every time you add something to the queue, and then decrement a counter for every time you finish processing something from the queue.

    Now, had you tried that, you would have likely run into a different problem: by default, the Parallel.ForEach() method attempts to batch up items from the source. This does not work well with a source like BlockingCollection<T> which can block during enumeration, waiting on additional data. In your example, this leads to a deadlock where Parallel.ForEach() is waiting for more items before it will queue the most recent batch, while the BlockingCollection<T> is waiting for more items to be processed and thus cause more items to be queued.

    With the ForEach() method waiting on the collection and the collection waiting on the ForEach() method, you get deadlock.

    There is a fix for that though: you can provide ForEach() with a partitioner that is specifically configured to not buffer the data, but rather queue the work items immediately as they are retrieved.

    Putting those two strategies together, you get a version of your code that looks something like this (with some minor changes to output that I had added for diagnostic purposes):

    static void Main(string[] args)
    {
        const int firstValue = 30;
        const int secondValues = 20;
        const int thirdValues = 10;
    
        var process = new BlockingCollection<int>() { firstValue };
    
        var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount };
        int totalItemCount = process.Count;
    
        OrderablePartitioner<int> partitioner = Partitioner.Create(process.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
    
        Parallel.ForEach(partitioner, parallelOptions, (item, state, i) =>
        {
            string message;
    
            if (item > secondValues)
            {
                // Some add 2 items
                Interlocked.Add(ref totalItemCount, 2);
                process.Add(item - 1);
                process.Add(item - 1);
                message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count} | item: {item} | Added: 2";
            }
            else if (item > thirdValues)
            {
                // Some add 1 item
                Interlocked.Increment(ref totalItemCount);
                process.Add(item - 1);
                message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 1";
            }
            else
            {
                // Some add 0 items
                message = $"{DateTime.Now.ToLongTimeString()}: process.Count: {process.Count}| item: {item} | Added: 0";
            }
    
            int newCount = Interlocked.Decrement(ref totalItemCount);
    
            if (newCount == 0)
            {
                process.CompleteAdding();
            }
    
            Console.WriteLine($"{message} | newCount: {newCount} | i: {i}");
        });
    
        // Parallel.ForEach will exit
        Console.WriteLine("Completed Processing");    
        Console.ReadKey();
    }