Search code examples
c#asynchronousconcurrencytask-parallel-libraryparallel.foreachasync

ForEachAsync with Result


I'm trying to change Stephen Toub's ForEachAsync<T> extension method into an extension which returns a result...

Stephen's extension:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current); 
        })); 
}

My approach (not working; tasks get executed but result is wrong)

public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source,
    int degreeOfParallelism, Func<T, Task<TResult>> body)
{
    return Task.WhenAll<TResult>(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run<TResult>(async () = 
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current); // When I "return await",
                        // I get good results but only one per partition 
            return default(TResult);
        }));
}

I know I somehow have to return (WhenAll?) the results from the last part but I didn't yet figure out how to do it...

Update: The result I get is just degreeOfParallelism times null (I guess because of default(TResult)) even though all the tasks get executed. I also tried to return await body(...) and then the result was fine, but only degreeOfParallelism number of tasks got executed.


Solution

  • Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.

    If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.

    public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
    {
        var lists = await Task.WhenAll<List<TResult>>(
            Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
                .Select(partition => Task.Run<List<TResult>>(async () =>
                        {
                            var list = new List<TResult>();
                            using (partition)
                            {
                                while (partition.MoveNext())
                                {
                                    list.Add(await body(partition.Current));
                                }
                            }
                            return list;
                       })));
         return lists.SelectMany(list => list).ToArray();
    }
    

    (I've renamed this from ForEachAsync, as ForEach sounds imperative (suitable for the Func<T, Task> in the original) whereas this is fetching results. A foreach loop doesn't have a result - this does.)