Search code examples
c#.net-coreparallel.foreachazure-function-async

Use Parallel.For in batches in dotnet core


I am using httptrigger function in dotnet core where i am getting httprequest data in Json format.I need to insert this value in Google Merchant center account. There are almost 9000 rows (dynamic data each time) that needs to be inserted. How i can implement the Parallel.for logic which will execute faster. Currently i am using for each loop like below but it is taking more time. Below is the code.

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic body = JsonConvert.DeserializeObject(requestBody);
for (int i =0;i<body.Count;i++)
{
  Product newProduct = InsertProduct(merchantId, websiteUrl,body[i]);
}

Solution

  • I created a small example maybe there you can find the best way which fits your case best.

    dotnet fiddle Example

    There are 3 options:

    In sequence

    As the title says every item is processed in sequence. Very save method but not the fastest one to process 9000 items :)

    var list = GenerateItems();
    var count = list.Count();
    for(var i = 0; i < count; i++) 
    {
        InsertInDatabaseAsync($"{i}", list.ElementAt(i)).GetAwaiter().GetResult();
    }
    

    With Parallel.For Library

    Like said from the comments its good for CPU bound processing but has some lacks on async methods (here)

    var list = GenerateItems();
    var count = list.Count();
    var options = new ParallelOptions{MaxDegreeOfParallelism = MAX_DEGREE_OF_PARALLELISM};
    Parallel.For(0, count, options, (i) => 
    {
        InsertInDatabaseAsync($"{i}", list.ElementAt(i)).GetAwaiter().GetResult();
    });
    

    With Async-Await

    I think in your example this fits best for you. Every item is processed in parallel, starts the processing directly and spinns up a Task. (Copied the async-extension from here)

    var list = GenerateItems();
    var count = list.Count();
    
    // Extensions method see in referenced SO answer
    ForEachAsync(count, list, async (item, index) => 
    {
        await InsertInDatabaseAsync($"{index}", item);
    }).GetAwaiter().GetResult();
    

    ...Updated

    Thanks for the comments. I have updated the async-await implementation to a more simpler one:

    private static async Task ForEachAsync<T>(IEnumerable<T> enumerable, Func<T, int, Task> asyncFunc)
    {
        var itemsCount = enumerable.Count();
        var tasks = new Task[itemsCount];
        int i = 0;
        foreach (var t in enumerable)
        {
            tasks[i] = asyncFunc(t, i);
            i++;
        }
        await Task.WhenAll(tasks);
    }
    

    And also added the MAX_DEGREE_OF_PARALLELISM set to 1. This has a huge impact on the parallel processing like described in the commends.

    enter image description here