I have a Parallel.ForEach loop which loops through a collection. Inside, the loop I make multiple network I/O calls. I used Task.ContinueWith and nested the subsequent async-await calls. The order of the processing doesn't matter, but the data from each async calls should be processed in a synchronized way. Meaning- For each iteration, the data retrieved from the first async call should get passed to the second async call. After the second async call finishes, the data from both the async call should be processed together.
Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));
//this is my first async call
await countryTask.ContinueWith((countryData) =>
{
countries.Add(countryData.Result);
Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));
//based on the data I receive in 'stateTask', I make another async call
stateTask.ContinueWith((stateData) =>
{
states.Add(stateData.Result);
// use data from both the async calls pass it to below function for some calculation
// in a synchronized way (for a country, its corresponding state should be passed)
myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
});
});
});
I tried the above without using continue await but it was not working in synchronized way. Now, the above code executes to completion but no records gets processed.
Any help with this please? Let me know if I can add more details.
As your methods involve I/O, they should be written to be truly asynchronous, not just synchronously ran on the threadpool using Task.Run
.
Then you could use Task.WhenAll
in combination with Enumerable.Select
:
var tasks = someCollection.Select(async item =>
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
return (country, state, calculation);
});
foreach (var tuple in await Task.WhenAll(tasks))
{
countries.Add(tuple.country);
states.Add(tuple.state);
myCollection.AddRange(tuple.calculation);
}
This would ensure that each country
> state
> calculation
occurs sequentially, but each item
is processed concurrently, and asynchronously.
Update as per comment
using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();
int failures = 0;
var tasks = someCollection.Select(async item =>
{
await semaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
Interlocked.Exchange(ref failures, 0);
return (country, state, calculation);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
semaphore.Release();
}
});
The semaphore ensures a maximum of 2 concurrent async operations, and the cancellation token will cancel all outstanding tasks after 10 consecutive exceptions.
The Interlocked
methods ensures that failures
is accessed in a thread-safe manner.
Further Update
It may be even more efficient to use 2 semaphores to prevent multiple iterations.
Encapsulate all the list-adding into a single method:
void AddToLists(Country country, State state, Calculation calculation)
{
countries.Add(country);
states.Add(state);
myCollection.AddRange(calculation);
}
Then you could allow 2 threads to simultaneously serve the Http requests, and 1 to perform the adds, making that operation thread-safe:
using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();
int failures = 0;
await Task.WhenAll(someCollection.Select(async item =>
{
await httpSemaphore.WaitAsync(cts.Token);
try
{
var country = await GetCountryAsync(item.Id);
var state = await GetStateAsync(country.CountryID);
var calculation = SomeCalculation(country, state);
await listAddSemaphore.WaitAsync(cts.Token);
AddToLists(country, state, calculation);
Interlocked.Exchange(ref failures, 0);
{
catch
{
if (Interlocked.Increment(ref failures) >= 10)
{
cts.Cancel();
}
throw;
}
finally
{
httpSemaphore.Release();
listAddSemaphore.Release();
}
}));