Search code examples
c#multithreadinglinqparallel-processingplinq

Binding source thread in PLINQ


I have a a computation that I'm parallelizing using PLINQ as follows:

  • Source IEnumerable<T> source is providing objects read from a file.

  • I have a heavyweight computation HeavyComputation I need to do on each T, and I want these farmed out across threads, so I am using PLINQ like: AsParallel().Select(HeavyComputation)

Here's where it gets interesting: due to constraints on the file reader type that provides the source, I need source to be enumerated on the initial thread, not on the parallel workers. I need the full evaluation of the source to be bound to the main thread. However it seems the source is actually enumerated on worker threads.

My question is: Is there a straightforward way to modify this code to bind the enumeration of the source to the initial thread, while farming out the heavy work to the parallel workers? Keep in mind that just doing an eager .ToList() before the AsParallel() is not an option here, as the data stream coming from the file is massive.

Here is some example code that demonstrates the problem as I see it:

using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System;

public class PlinqTest
{
    private static string FormatItems<T>(IEnumerable<T> source)
    {
            return String.Format("[{0}]", String.Join(";", source));
    }

    public static void Main()
    {
        var expectedThreadIds = new[] { Thread.CurrentThread.ManagedThreadId };

        var threadIds = Enumerable.Range(1, 1000)
                .Select(x => Thread.CurrentThread.ManagedThreadId) // (1)
                .AsParallel()
                .WithDegreeOfParallelism(8)
                .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                .AsOrdered()
                .Select(x => x)                                    // (2)
                .ToArray();

        // In the computation above, the lambda in (1) is a
        // stand in for the file-reading operation that we
        // want to be bound to the main thread, while the
        // lambda in (2) is a stand-in for the "expensive
        // computation" that we want to be farmed out to the
        // parallel worker threads.  In fact, (1) is being
        // executed on all threads, as can be seen from the
        // output.

        Console.WriteLine("Expected thread IDs: {0}",
                          FormatItems(expectedThreadIds));
        Console.WriteLine("Found thread IDs: {0}",
                          FormatItems(threadIds.Distinct()));
    }
}

Example output I get is:

Expected thread IDs: [1]
Found thread IDs: [7;4;8;6;11;5;10;9]

Solution

  • This is fairly straightforward (although perhaps not as concise) if you abandon PLINQ and just use the Task Parallel Library explicitly:

    // Limits the parallelism of the "expensive task"
    var semaphore = new SemaphoreSlim(8);
    
    var tasks = Enumerable.Range(1, 1000)
        .Select(x => Thread.CurrentThread.ManagedThreadId)
        .Select(async x =>
        {
            await semaphore.WaitAsync();
            var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId));
            semaphore.Release();
    
            return result;
        });
    
    return Task.WhenAll(tasks).Result;
    

    Note that I'm using Tuple.Create to record both the thread ID coming from the main thread and the thread ID coming from the spawned task. From my test, the former is always the same for every tuple, while the latter varies, which is as it should be.

    The semaphore makes sure that the degree of parallelism never goes above 8 (although with the inexpensive task of creating a tuple this isn't very likely anyway). If you get to 8, any new tasks will wait until there's spots available on the semaphore.