Search code examples
c#multithreadingparallel-processingtask-parallel-libraryparallel.foreach

Persist variable on thread in Parallel.ForEach


I am looking to parallelise processing of a task which is dependent on an object (State), which is not thread-safe, and whose construction is time-expensive.

For this reason, I was looking into partition-local variables, but either I am doing it wrong or looking for something else. This more or less represents my current implementation:

Parallel.ForEach<string, State>(folders, config, () => new State(), (source, loopState, index, threadState) =>
{
    var content = File.ReadAllText(source);        // read file
    var result = threadState.doSomething(content); // do something
    File.WriteAllText(outputFile, result);         // write output
    return threadState;
}, (threadState) => { });

However, I have added a Console.WriteLine in my State initializer, and I see that for every iteration of the loop, the State constructor is called, resulting in a big performance hit. I would like the instance of State in one thread being passed to the subsequent iteration on that same thread.

How can I achieve that?


Solution

  • You have a few options. The simplest is to create a single State object, and synchronize it with the lock statement:

    State state = new();
    
    Parallel.ForEach(folders, parallelOptions, folder =>
    {
        string content = File.ReadAllText(folder);
        string result;
        lock (state) { result = state.DoSomething(content); }
        File.WriteAllText(outputFile, result);
    });
    

    I assume that this is not viable because the DoSomething method is time consuming, and synchronizing it will defeat the purpose of parallelization.

    Another option is to use a ThreadLocal<State>. This class provides a thread-local storage of data, so the number of State objects created will be equal to the number of threads employed by the Parallel.ForEach.

    using ThreadLocal<State> threadLocalState = new(() => new State());
    
    Parallel.ForEach(folders, parallelOptions, folder =>
    {
        string content = File.ReadAllText(folder);
        string result = threadLocalState.Value.DoSomething(content);
        File.WriteAllText(outputFile, result);
    });
    

    This will probably create less State objects than the Parallel.ForEach<TSource, TLocal> overload, but still not equal to the configured MaxDegreeOfParallelism. The Parallel.ForEach uses threads from the ThreadPool, and it is quite possible that it will use all of them during the calculation, provided that the list of folders is sufficiently long. And you have little control over the size of the ThreadPool. So this is not a particularly enticing solution either.

    The third and final option I can think of is to create a pool of State objects, and Rent/Return one in each loop:

    ObjectPool<State> statePool = new(() => new State());
    
    Parallel.ForEach(folders, parallelOptions, folder =>
    {
        State state = statePool.Rent();
        string content = File.ReadAllText(folder);
        string result = state.DoSomething(content);
        File.WriteAllText(outputFile, result);
        statePool.Return(state);
    });
    

    This way the number of the instantiated State objects will be equal to the maximum degree of parallelism.

    The only problem is that there is no ObjectPool<T> class in the standard .NET libraries (there is only an ArrayPool<T> class), so you'll have to find one. Here is a simple implementation based on a ConcurrentBag<T>:

    public class ObjectPool<T> : IEnumerable<T> where T : new()
    {
        private readonly ConcurrentBag<T> _bag = new ConcurrentBag<T>();
        private readonly Func<T> _factory;
    
        public ObjectPool(Func<T> factory = default) => _factory = factory;
    
        public T Rent()
        {
            if (_bag.TryTake(out T obj)) return obj;
            return _factory is not null ? _factory() : new T();
        }
    
        public void Return(T obj) => _bag.Add(obj);
    
        public IEnumerator<T> GetEnumerator() => _bag.GetEnumerator();
        IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
    }
    

    Update: An ObjectPool<T> class exists in the Microsoft.Extensions.ObjectPool package. It has a slightly different API than the custom ObjectPool<T> implementation shown above.