Search code examples
c#.netparallel-processingplinqparallel-extensions

Using Parallel Linq Extensions to union two sequences, how can one yield the fastest results first?


Let's say I have two sequences returning integers 1 to 5.

The first returns 1, 2 and 3 very fast, but 4 and 5 take 200ms each.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}

The second returns 1, 2 and 3 with a 200ms delay, but 4 and 5 are returned fast.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}

Unioning both these sequences give me just numbers 1 to 5.

FastFirst().Union(SlowFirst());

I cannot guarantee which of the two methods has delays at what point, so the order of the execution cannot guarantee a solution for me. Therefore, I would like to parallelise the union, in order to minimise the (artifical) delay in my example.

A real-world scenario: I have a cache that returns some entities, and a datasource that returns all entities. I'd like to be able to return an iterator from a method that internally parallelises the request to both the cache and the datasource so that the cached results yield as fast as possible.

Note 1: I realise this is still wasting CPU cycles; I'm not asking how can I prevent the sequences from iterating over their slow elements, just how I can union them as fast as possible.

Update 1: I've tailored achitaka-san's great response to accept multiple producers, and to use ContinueWhenAll to set the BlockingCollection's CompleteAdding just the once. I just put it here since it would get lost in the lack of comments formatting. Any further feedback would be great!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}

Solution

  • Take a look at this. The first method just returns everything in order results come. The second checks uniqueness. If you chain them you will get the result you want I think.

    public static class Class1
    {
        public static IEnumerable<TResult> SelectAsync<TResult>(
            IEnumerable<TResult> producer1,
            IEnumerable<TResult> producer2,
            int capacity)
        {
            var resultsQueue = new BlockingCollection<TResult>(capacity);
            var producer1Done = false;
            var producer2Done = false;
    
            Task.Factory.StartNew(() =>
            {
                foreach (var product in producer1)
                {
                    resultsQueue.Add(product);
                }
                producer1Done = true;
                if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
            });
    
            Task.Factory.StartNew(() =>
            {
                foreach (var product in producer2)
                {
                    resultsQueue.Add(product);
                }
                producer2Done = true;
                if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
            });
    
            return resultsQueue.GetConsumingEnumerable();
        }
    
    
        public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
        {
            HashSet<TResult> knownResults = new HashSet<TResult>();
            foreach (TResult result in source)
            {
                if (knownResults.Contains(result)) {continue;}
                knownResults.Add(result);
                yield return result;
            }
        }
    }