Search code examples
c#asynchronousparallel-processingparallel.foreach

How we can manage async calls into Parallel.ForEach?


I am new at asynchronous programming and I use the following code to collect data from third-party API, and every time I am getting different responses. Am I doing the wrong approach?

Parallel.ForEach(products, item =>
{
    GetProductsInfo(item);
});

public async Task<Product> GetProductsInfo(Product product)
{
    var restClientProduct = new RestClient("URL");
    var restRequestProduct = new RestRequest(Method.POST);
    var proudctRequestJson = JsonConvert.SerializeObject(new ProudctRequest()
    {
        product_code = product.product_code,

    });
    restRequestProduct.AddHeader("cache-control", "no-cache");
    restRequestProduct.AddHeader("Content-Length",
        proudctRequestJson.Count().ToString());
    restRequestProduct.AddHeader("Content-Type", "application/json");
    restRequestProduct.AddHeader("Accept", "application/json");
    restRequestProduct.AddParameter("undefined", proudctRequestJson,
        ParameterType.RequestBody);
    var responseProduct = GetResponseContentAsync(
        restClientProduct, restRequestProduct).Result;
    if (responseProduct.StatusCode == HttpStatusCode.OK)
    {
        // set values form the responseProduct to the product
    }
    return product;
}

private Task<IRestResponse> GetResponseContentAsync(RestClient theClient,
    RestRequest theRequest)
{
    var tcs = new TaskCompletionSource<IRestResponse>();
    theClient.ExecuteAsync(theRequest, response =>
    {
        tcs.SetResult(response);
    });
    return tcs.Task;
}

Solution

  • The parts of your code you have shown us is not running asynchronously. You are calling .Result on GetResponseContentAsync(), which will block the thread until it finishes. That means that by the time Parallel.ForEach completes, all the HTTP requests will have completed.

    If you are using await somewhere in that block of code you replaced with

    // set values form the responseProduct to the product
    

    then it's possible that the results are not being reported before Parallel.ForEach finishes. That is because Parallel.ForEach does not support asynchronous code, so it will not wait for them to finish.

    Let's assume that GetProductsInfo is actually running asynchronously

    Then the problem is: Parellel.ForEach is not waiting for my asynchronous operations to finish. There are a couple ways to handle this.

    1. Implement your own ForEachAsync. This has been requested, and will probably be added (to .NET Core at least) eventually. But there is actually a sample implementation in the issue where this was requested:
    /// <summary>
    ///     Executes a foreach asynchronously.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="source">The source.</param>
    /// <param name="dop">The degrees of parallelism.</param>
    /// <param name="body">The body.</param>
    /// <returns></returns>
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in System.Collections.Concurrent.Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate
            {
                using (partition)
                {
                    while (partition.MoveNext())
                        await body(partition.Current);
                }
            }));
    }
    

    That is written as an extention method, so you would use it like this:

    await products.ForEachAsync(10, GetProductsInfo);
    

    Where 10 is the number of request you would like to run at a time.

    1. You can use something like:
    Task.WaitAll(items.Select(i => GetProductsInfo(i));
    

    This will run the requests asynchronously, but block the calling thread until they all finish. Alternatively, you can await them, so it doesn't block the calling thread:

    await Task.WhenAll(items.Select(i => GetProductsInfo(i))
    

    However, both these methods will fire off all of the requests at once. If you know you will only ever have a small number, then that's fine. But if you might have a very large number, you could flood the web service. Using Parallel.ForEach, or the implementation of ForEachAsync above will send them in blocks.

    If you use any of these methods to await the responses, then you really should await GetResponseContentAsync instead of using .Result:

    var responseProduct = await GetResponseContentAsync(restClientProduct, restRequestProduct);
    

    Using async/await is especially important in ASP.NET, where there is a maximum number of threads it can use.