Search code examples
c#.netparallel-processingproducer-consumerplinq

Parallel ordered consumable


I would like to process some items in parallel. This processing is independent (order does not matter) and returns an output. These outputs should then be relayed back in order as quickly as possible.

That is to say, the method should behave equivalent to this (except calling Process in parallel):

IEnumerable<T> OrderedParallelImmediateSelect<T> (IEnumerable<object> source)
{
    foreach (var input in source) {
        var result = Process (input);
        yield return result;
    }
}

Accordingly, it it required to try to process the items in order. As this is (of course) not guaranteed to finish in order, the result collector must be sure to wait for delayed results.

As soon as the next result in order comes in, it must be returned immediately. We cannot wait for the whole input to be processed before sorting the results.

This is an example of how this could look like:

begin 0
begin 1     <-- we start processing in increasing order
begin 2
complete 1  <-- 1 is complete but we are still waiting for 0
begin 3
complete 0  <-- 0 is complete, so we can return it and 1, too
return 0
return 1
begin 4
begin 5
complete 4  <-- 2 and 3 are missing before we may return this
complete 2  <-- 2 is done, 4 must keep waiting
return 2
begin 6
complete 3  <-- 3 and 4 can now be returned
return 3
return 4

If at all possible, I would like to perform processing on a regular thread pool.

Is this scenario something .NET provides a solution for? I've built a custom solution, but would prefer to use something simpler.

I'm aware of a lot of similar questions, but it seems they all either allow waiting for all items to finish processing or do not guarantee ordered results.

Here's an attempt that sadly does not seem to work. Replacing IEnumerable with ParallelQuery had no effect.

int Process (int item)
{
    Console.WriteLine ($"+ {item}");
    Thread.Sleep (new Random (item).Next (100, 1000));
    Console.WriteLine ($"- {item}");
    return item;
}
void Output (IEnumerable<int> items)
{
    foreach (var it in items) {
        Console.WriteLine ($"=> {it}");
    }
}

IEnumerable<int> OrderedParallelImmediateSelect (IEnumerable<int> source)
{
    // This processes in parallel but does not return the results immediately
    return source.AsParallel ().AsOrdered ().Select (Process);
}

var input = Enumerable.Range (0, 20);
Output (OrderedParallelImmediateSelect (input));

Output:

+0 +1 +3 +2 +4 +5 +6 +7 +9 +10 +11 +8 -1 +12 -3 +13 -5 +14 -7 +15 -9 +16 -11 +17 -14 +18 -16 +19 -0 -18 -2 -4 -6 -8 -13 -10 -15 -17 -12 -19 =>0 =>1 =>2 =>3 =>4 =>5 =>6 =>7 =>8 =>9 =>10 =>11 =>12 =>13 =>14 =>15 =>16 =>17 =>18 =>19


Solution

  • I created this program, as a console application:

    using System;
    using System.Linq;
    using System.Threading;
    
    namespace PlayAreaCSCon
    {
        class Program
        {
            static void Main(string[] args)
            {
                var items = Enumerable.Range(0, 1000);
                int prodCount = 0;
    
                foreach(var item in items.AsParallel()
                .AsOrdered()
                .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                .Select((i) =>
                {
                    Thread.Sleep(i % 100);
                    Interlocked.Increment(ref prodCount);
                    return i;
                }))
                {
                    Console.WriteLine(item);
                }
                Console.ReadLine();
            }
        }
    }
    

    I then initially set a breakpoint on Console.WriteLine(item);. Running the program, when I first hit that breakpoint, prodCount is 5 - we're definitely consuming results before all processing has completed. And after removing the breakpoint, all results appear to be produced in the original order.