Search code examples
c#task-parallel-libraryplinq

Understanding AsParallel and AsSequential: which part of the LINQ query is concurrent?


I'm trying to understand if I can use a non thread-safe class left to a AsParallel query. Something like:

src.Select(item => nonSafeClass.Process(item))
   .AsParallel()
   .Select(item => DoComputationalIntenseButThreadSafeWork(item));

I've tried to run the following code to see which part of the query chain is executed in parallel in which is not:

IEnumerable<int> array = Enumerable.Range(0, short.MaxValue).ToArray();
array.Select(i =>
    {
        Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
        return i;
    }).AsParallel().Select(i =>
        {
            Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
            return i;

        }).AsSequential().Select(i =>
            {
                Console.WriteLine("Step Three: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).ToList();

But to my surprise, both "Step One" and "Step Three" appear on different thread IDs. I was expecting to see different thread IDs only for "Step Two" because it is between a AsParallel and a AsSequential. Is my thinking wrong?


Solution

  • This is because of deferred execution.

    And how chained queries in Linq work.

    If you change it to the simplest case of

    array.Select(i =>
            {
                Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).Select(i =>
            {
                Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
    
            }).Select(i =>
            {
                Console.WriteLine("Step Three: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).ToList();
    

    You will see this: Step 1 Step 2 Step 3 Step 1 Step 2 Step 3 ... ...

    Now imagine if your assumption was correct:

    First Select() runs on Thread 1 (main thread). Then your AsParallel runs on a different thread but finally, your final AsSequential() needs to run on the same thread, meaning it doesn't make any difference to have the AsParallel running on a different thread since Thread 1 is blocked.

    The flow you were thinking would be: 1 -> x -> 1

    followed by 1 -> y -> 1

    and so on and so forth.

    As an optimization, when Linq detects that you have a select followed by AsParallel, it runs them for each iteration on a separate thread. Again, this is because going from 1 -> x -> 1 -> y wouldn't make anything run in "parallel".

    Try it by running a simplified version:

    array.Select(i =>
            {
                Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).AsParallel().Select(i =>
            {
                Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
    
            }).ToList();
    

    You will see that Step 1 and Step 2 are don in "sequence" but each iteration is done on a different thread.

    Your AsSequential() however will run on the main thread that executed it.

    As such, I would expect Step 1 and Step 2 to run on the same thread which is different from the calling thread but Step 3 to run on the same thread that started the chain.

    If you want to achieve the behavior you described, simply change your query to this:

    array.Select(i =>
            {
                Console.WriteLine("Step One: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).ToList().AsParallel().Select(i =>
            {
                Console.WriteLine("Step Two: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
    
            }).AsSequential().Select(i =>
            {
                Console.WriteLine("Step Three: {0}", Thread.CurrentThread.ManagedThreadId);
                return i;
            }).ToList();
    

    The first ToList() evaluation will run everything on the calling thread, the AsParallel() then runs each iteration on a different thread (subject to ThreadPool availability) and finally, your AsSequential will make sure the sequential bits are run on the calling thread.