Search code examples
.nettask-parallel-librarytpl-dataflowthread-local-storage

TPL Data Flow Thread Local Data


Is there a good way to pass thread local data into an ActionBlock, such that if you specify MaxDegreeOfParallelization in its DataFlowExecutionOptions to be > 1, then each task that executes the action will have its own thread local data?

Here is some of my code that will perhaps clarify what I want to do:

var options = new ExecutionDataflowBlockOptions() 
     {
        MaxDegreeOfParallelism = 12
     };

ActionBlock<int> actionBlock = new ActionBlock<int>(PerformAction, options);

List<int> resultsList = new List<int>();

void PerformAction(int i)
{
    // do some work

    // add them to resultsList 

    // i want to make sure that each thread that executes this method has its 
    // own copy of resultsList 
}

I want to be able to have the ActionBlock call a thread local init function that I supply. Something like this:

new ActionBlock<int>(PerformAction, options, () => new List<int>()); 

And have it pass my thread local data into my Action function:

void PerformAction(int i, List<int> localUserData) {...}

Solution

  • I still don't understand why do you need thread-local list in a dataflow block. And you're right that TDF doesn't have any explicit support for thread-local values (the way Parallel.ForEach() does). But that doesn't mean you can't use thread-local values, you'll just have to do everything manually, using ThreadLocal (I think [ThreadStatic] wouldn't work well here, because it doesn't allow you to track all thread-local instances). For example:

    private static ThreadLocal<List<int>> threadLocalList;
    
    private static void Main()
    {
        threadLocalList = new ThreadLocal<List<int>>(() => new List<int>(), true);
    
        var block = new ActionBlock<int>(
            (Action<int>)PerformAction,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
    
        for (int i = 0; i < 10; i++)
            block.Post(i);
    
        block.Complete();
        block.Completion.Wait();
    
        foreach (var list in threadLocalList.Values)
            Console.WriteLine(string.Join(", ", list));
    
        threadLocalList.Dispose();
    }
    
    private static void PerformAction(int i)
    {
        threadLocalList.Value.Add(i * i);
    }