Search code examples
c#multithreadingtask-parallel-libraryparallel.foreach

How to prevent Parallel.ForEach loop from changing the number of tasks during runtime?


I'm using the Parallel.ForEach loop to do some work and I initialize it with the localInit like this:

localInit: () => new
{
    foo = new Foo(),
    bars = CreateBars(),
}

According to the documentation:

localInit, or the function that initializes the thread-local variable. This function is called once for each partition in which the Parallel.ForEach<TSource> operation executes. Our example initializes the thread-local variable to zero.

So I tried to use it like that but I observed that the loop is constantly killing and creating new tasks which results in frequent calls to localInit. This I my option is counterproductive and doesn't work as desired.

I thought when the Parallel.ForEach would create for example four partitions it would keep them alive until it itereated over all items but it doesn't. It's calling localFinally and localInit several hundered times for a collection with a few thousend items. How so?

Can this behavior somehow be prevented? I was really hoping to save some resources but it doesn't really let me.


Here's how the loop looks like:

var parallelLoopResult = Parallel.ForEach
(
    source: items,
    parallelOptions: parallelOptions,
    localInit: () => new
    {
        foo = new Foo(),
        bars = CreateBars(),
    },
    body: (item, loopState, i, local) =>
    {
        parallelOptions.CancellationToken.ThrowIfCancellationRequested();

        var results = local.bars.Select(x => ...).ToList().
        
        ....
        
        return local;
    },
    localFinally: local =>
    {
        local.foo.Dispose();
        lock (aggregateLock)
        {
            ... process transformed bars
        }
    }
);

ParallelOptions:

var parallelOptions = new ParallelOptions
{
    CancellationToken = cancellationTokenSource.Token,
#if DEBUG
    MaxDegreeOfParallelism = 1
    //MaxDegreeOfParallelism = Environment.ProcessorCount
#else
    MaxDegreeOfParallelism = Environment.ProcessorCount
#endif
};

Solution

  • If I understand the code correctly, Parallel.ForEach() restarts each Task every few hundred milliseconds. This means that if each iteration is substantial (as it generally should be), you will get lots of Tasks and thus lots of calls to localInit and localFinally. The reason for this is fairness with regards to other code in the same process that also uses the same ThreadPool.

    I don't think there is a way to change this behavior of Parallel.ForEach(). I think a good way to solve this is to write your own simple version of Parallel.ForEach(). Considering that you can take advantage of Partitioner<T> and depending on what features of Parallel.ForEach() you need, it could be relatively simple. For example, something like:

    public static void MyParallelForEach<TSource, TLocal>(
        IEnumerable<TSource> source, int degreeOfParallelism,
        Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally)
    {
        var partitionerSource = Partitioner.Create(source).GetDynamicPartitions();
    
        Action taskAction = () =>
        {
            var localState = localInit();
    
            foreach (var item in partitionerSource)
            {
                localState = body(item, localState);
            }
    
            localFinally(localState);
        };
    
        var tasks = new Task[degreeOfParallelism - 1];
    
        for (int i = 0; i < degreeOfParallelism - 1; i++)
        {
            tasks[i] = Task.Run(taskAction);
        }
    
        taskAction();
    
        Task.WaitAll(tasks);
    }