I have the following architecture:
Web Site -calls-> Proxy Api -calls-> External Api
From the website, I have to make <= 25 calls from the web site to the proxy api. My goal is to call all apis and either:
Task.WhenAll
), orWebsite Code
async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
var apiTasks = apiInfos
.Select( apiInfo => {
// Simplified code...but return a call to Task<T> function *without* await
return GetWebApiResultAsync( info, token );
} );
var xmlResults = await Task.WhenAll( apiTasks );
foreach( var result in xmlResults )
{
// Process results
}
}
async Task<XElement> GetWebApiResultAsync( ApiInfo info, CancellationToken ct = default )
{
// simplified but essentially call SendAsync with some processing of info before and results after to return XElement.
using var response = await httpClient.SendAsync( request, cancellationToken: cancellationToken );
return new XElement( "ResultsGeneratedFromRESTCall" );
}
Proxy Api
app.MapGet("/call-external", (CancellationToken token) =>
{
// Some code to prep to call external api, but looks the same as call above
// but could throw exception
using var externalResponse = await httpClient.SendAsync( httpRequest, token );
externalResponse.EnsureSuccessStatusCode();
var externalResult = ( await coverageResponse.Content.ReadFromJsonAsync<JsonObject>() )!;
if ( externalResult[ "CustomCheck" ] != null )
{
throw new ApplicationException( "Custom failure from external source" );
}
// Otherwise generate results (could require making additional calls to external api)
});
I was going to change my pattern to this answer that implemented a custom ForEachAsync
method. Would simply changing my function to look like this accomplish my two goals above (specifically throwing first exception it receives and cancelling all other requests)?
New WebSite Code with ForEachAsync
async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
var xmlResults =
await apiInfos.ForEachAsync(
new ParallelOptions {
MaxDegreeOfParallelism = apiInfos.Length,
CancellationToken = token
},
async ( apiInfo, ct ) => {
// Now returning via await!
return await GetWebApiResultAsync( info, ct );
} );
// remaining code unchanged
}
Update: Using CancellationTokenSource to Cancel Other Tasks
I updated my call to the following, but now the only exception I get 'at the end' is System.Threading.Tasks.TaskCanceledException
instead of the actual Exception that was originally thrown. Is that expected? Is there a way to find the original exception?
async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
using var cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource( token );
var xmlResults =
await apiInfos.ForEachAsync(
new ParallelOptions {
MaxDegreeOfParallelism = apiInfos.Length,
CancellationToken = cancelTokenSource.Token
},
async ( apiInfo, ct ) => {
// Now returning via await!
try
{
if ( simulatedException )
{
await Task.Delay( 1500 );
throw new ApplicationException( "Emails endpoint is not supported" );
}
return await GetWebApiResultAsync( info, ct );
}
catch
{
cancelTokenSource.Cancel();
throw;
}
} );
// remaining code unchanged
}
Yes, what you are doing with the ForEachAsync
of the other answer is correct, with the exception of the configuration MaxDegreeOfParallelism = apiInfos.Length
, which is questionable. You generally do want to impose a limit to the concurrency.
If you wanted to do the same with the simpler Task.WhenAll
, you would do something like this:
async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
using CancellationTokenSource linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
Task<XElement>[] apiTasks = apiInfos.Select(apiInfo =>
{
try { return GetWebApiResultAsync(info, linkedCts.Token); }
catch { cts.Cancel(); throw; }
}).ToArray();
XElement[] xmlResults = await Task.WhenAll(apiTasks);
//...
}
But using the Parallel.ForEachAsync
is better because it allows to limit the concurrency, so that you don't bombard the remoter server with dozens of requests simultaneously.
Btw in your case there is a simpler way to collect the results of the Parallel.ForEachAsync
, because the source sequence is an array with known size:
async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
{
ParallelOptions options = new()
{
MaxDegreeOfParallelism = 5, // A reasonable number
CancellationToken = cancellationToken
};
XElement[] xmlResults = new XElement[apiInfos.Length];
await Parallel.ForEachAsync(Enumerable.Range(0, apiInfos.Length), options, async (i, ct) =>
{
xmlResults[i] = await GetWebApiResultAsync(apiInfos[i], ct);
});
// Here all the slots of the xmlResults array are filled with data.
}