Parallel foreach loop with async code doesn't complete correctly for unknown reason

I am trying to rewrite a foreach loop to use Parallel.ForEach since every document I need to process can be handled as s separate entity there are no dependencies what so ever.

The code is fairly straight forward as below:

  • Query the DB
  • Read each document in the loop
  • For each document do two web calls and add results to the document
  • Add updated document to list
  • BulkImport list to DB

Since the web API calls are the slowest part due to network delay, I wanted to process them in parallell to save time so I wrote this code

private async Task<List<String>> FetchDocumentsAndBuildList(string brand)
    using (var client = new DocumentClient(new Uri(cosmosDBEndpointUrl), cosmosDBPrimaryKey))
        List<string> formattedList = new List<string>();
        FeedOptions queryOptions = new FeedOptions
            MaxItemCount = -1,
            PartitionKey = new PartitionKey(brand)

        var query = client.CreateDocumentQuery<Document>(UriFactory.CreateDocumentCollectionUri(cosmosDBName, cosmosDBCollectionNameRawData), $"SELECT TOP 2 * from c where c.brand = '{brand}'", queryOptions).AsDocumentQuery();

            var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };

            Parallel.ForEach(await query.ExecuteNextAsync<Document>(), options, async singleDocument =>
                JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");

                if (originalData != null)
                    var artNo = originalData.GetValue("artno");
                    if (artNo != null)
                        string strArtNo = artNo.ToString();
                        string productNumber = strArtNo.Substring(0, 7);
                        string colorNumber = strArtNo.Substring(7, 3);
                        string HmGoeUrl = $"https://xxx,xom/Online/{strArtNo}/en";
                        string sisApiUrl = $"https:/{productNumber}/{colorNumber}?&maxnumberofstores=10&brand=000&channel=02";
                        string HttpFetchMethod = "GET";

                        JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
                        JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);

                        if (detailedDataResponse != null)
                            JObject productList = (JObject)detailedDataResponse["product"];

                            if (productList != null)
                                var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
                                .Single(x => x.code == strArtNo)

                                detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];

                        singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
                        singleDocument.SetPropertyValue("InventoryData", inventoryData);
                        singleDocument.SetPropertyValue("consumer", "NWS");

            //foreach (Document singleDocument in await query.ExecuteNextAsync<Document>())
            //    JObject originalData = singleDocument.GetPropertyValue<JObject>("BasicData");

            //    if(originalData != null)
            //    {
            //        var artNo = originalData.GetValue("artno");
            //        if(artNo != null)
            //        {
            //            string strArtNo = artNo.ToString();
            //            string productNumber = strArtNo.Substring(0, 7);
            //            string colorNumber = strArtNo.Substring(7, 3);
            //            string HmGoeUrl = $"https:/xxx.xom/Online/{strArtNo}/en";
            //            string sisApiUrl = $"https://yyy.xom&maxnumberofstores=10&brand=000&channel=02";
            //            string HttpFetchMethod = "GET";

            //            JObject detailedDataResponse = await DataFetcherAsync(HmGoeUrl, HttpFetchMethod);
            //            JObject inventoryData = await DataFetcherAsync(sisApiUrl, HttpFetchMethod);

            //            if(detailedDataResponse != null)
            //            {
            //                JObject productList = (JObject)detailedDataResponse["product"];

            //                if(productList != null)
            //                {
            //                    var selectedIndex = productList["articlesList"].Select((x, index) => new { code = x.Value<string>("code"), Node = x, Index = index })
            //                    .Single(x => x.code == strArtNo)
            //                    .Index;

            //                    detailedDataResponse = (JObject)productList["articlesList"][selectedIndex];
            //                }
            //            }

            //            singleDocument.SetPropertyValue("DetailedData", detailedDataResponse);
            //            singleDocument.SetPropertyValue("InventoryData", inventoryData);
            //            singleDocument.SetPropertyValue("consumer", "NWS");
            //        }
            //    }
            //    formattedList.Add(Newtonsoft.Json.JsonConvert.SerializeObject(singleDocument));
        return formattedList;

If I add a breakpoint in the loop, I can see the correct values are assigned to each variable but for some reason the formattedList returned is always 0 entries and I cannot figure out why.

Commented out is the original foreach loop that works just fine but is slooooow

--- EDIT --- THis is how I am calling this code from the parent method

   log.LogInformation($"Starting creation of DocumentList for BulkImport at: {DateTime.Now}");

   var documentsToImportInBatch = await FetchDocumentsAndBuildList(brand);

   log.LogInformation($"BulkExecutor DocumentList has: {documentsToImportInBatch.Count} entries, created at: {DateTime.Now}");


  • The problem here is that Parallel.ForEach doesn't understand that each call to your lambda returning a Task needs to be awaited before the ForEach can be considered complete.

    As a result, the continuation after the await isn't invoked before your function exits, and this is why formattedList has zero elements in it.

    You can easily prove this with a code sample such as:

    Parallel.ForEach(Enumerable.Range(0, 100), async singleDocument => await Task.Delay(9999));

    Done will be printed almost immediately.

    For I/O bound parallelism, you could instead use Task.WhenAll to parallelize your async webscraping calls

    var myDocuments = await query.ExecuteNextAsync<Document>();
    var myScrapingTasks = myDocuments.Select(async singleDocument =>
           // ... all of your web scraping code here
           // return the amended (mutated) document
           return JsonConvert.SerializeObject(singleDocument);
    var results = await Task.WhenAll(myScrapingTasks);

    w.r.t MaxDegreeOfParallelism, if you find that you need to throttle the number of concurrent scraping calls, easiest would be to group the incoming documents into manageable chunks and processing the smaller chunks at a time - the Select(x, i) overload and GroupBy work wonders.