Update: 're-submitted' with hopefully easier code for responders to examine and run locally to reproduce the problem.
TLDR; I think my questions below stem from a lack of experience with multi-threaded async/await programming. I think the confusion all comes from the fact that there are multiple await
statements throughout my code/workflow that take an indeterminant amount of time, leading to different timing/logging results/behaviors.
I have the following architecture.
Web Site Api Endpoints -calls-> Proxy Api -calls-> External Api
This question is a follow up to my question: Best way to handle Async/Await and cancellation to cancel all running tasks if any fail which I received great advice on a helper ForEachAsync helper method.
My outstanding issue is trying to understand CancellationToken
and why I'm not receiving consistent OperationCanceledException
exceptions thrown/logged where I thought they might otherwise have been.
I've written a .NET 7 console application that, to the best of my ability, simulates my architecture and the code can be found below.
My Questions
throw new ApplicationException($"Site - Fake Exception for {item}");
and run the application over and over, there are a few notable issues.
httpClient.SendAsync
calls were cancelled. Why, does it take 'that long' to get a http request ready and the CancellationToken
will already have been set to cancelled?logs.Enqueue($"Site: Calling ProxyApiAsync for {item}");
line within the async delegate. Why?throw new ApplicationException($"Site - Fake Exception for {item}");
and run the application over and over, there are a few notable issues.
using System;
using System.Linq;
using System.Threading;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Concurrent;
public static class Program
{
// Use this instead of Console.WriteLine() to enable some 'summary' queries after processing
static ConcurrentQueue<string> logs = new ConcurrentQueue<string>();
public static async Task Main()
{
logs = new ConcurrentQueue<string>();
var builder = WebApplication.CreateBuilder();
builder
.Logging.SetMinimumLevel(LogLevel.None)
.Services.AddHttpClient();
var app = builder.Build();
app.MapGet("/proxyapi/{taskId}", ProxyApiAsync); // simulated Proxy Api web api
var minApiTask = app.RunAsync();
await Task.Delay(100); // Wait for the API to start
var httpClientFactory = app.Services.GetRequiredService<IHttpClientFactory>(); // simulated support for DI
var apiInfosToRun = Enumerable.Range(0, 10).ToArray(); // simulated list of APIs to call
try
{
await WebSiteEndpointAsync( apiInfosToRun, httpClientFactory );
}
catch (Exception ex)
{
logs.Enqueue($"Site: {ex.GetType().Name}: {ex.Message}");
}
await app.StopAsync();
await minApiTask;
Console.WriteLine($"{logs.Count(l => l.StartsWith("Site: Calling Proxy"))} calls from Site -> Api Proxy");
Console.WriteLine($"{logs.Count(l => l.StartsWith("ProxyApiAsync: Calling external"))} calls from Api Proxy -> External");
Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: ProxyApiAsync"))} OperationCanceledException caught in Proxy Api");
Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: Site Delegate"))} OperationCanceledException caught in Site Delegate");
Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: Site ForEachAsync"))} OperationCanceledException caught in Site ForEachAsync Extension");
Console.WriteLine($"{logs.Count(l => l.StartsWith("ProxyApiAsync: ApplicationException"))} ApplicationException caught in Proxy Api");
Console.WriteLine($"{logs.Count(l => l.StartsWith("Site: ApplicationException"))} ApplicationException caught in Site");
Console.WriteLine("");
foreach (var log in logs)
{
Console.WriteLine(log);
}
}
static async Task WebSiteEndpointAsync( int[] apiInfosToRun, IHttpClientFactory httpClientFactory )
{
logs.Enqueue($"Site: {apiInfosToRun.Length} APIs to run");
var apiResponses = await apiInfosToRun.ForEachAsync(
new ParallelOptions { MaxDegreeOfParallelism = Int32.MaxValue },
async (item, ct) =>
{
try
{
logs.Enqueue($"Site: Calling ProxyApiAsync for {item}");
if (item == 3)
{
// throw new ApplicationException($"Site - Fake Exception for {item}");
}
using var client = httpClientFactory.CreateClient();
using var request = new HttpRequestMessage
{
Method = HttpMethod.Get,
RequestUri = new Uri($"http://localhost:5000/proxyapi/{item}"),
};
var apiResponse = await client.SendAsync(request, ct);
apiResponse.EnsureSuccessStatusCode();
var apiResult = await apiResponse.Content.ReadAsStringAsync(ct);
if (apiResult.StartsWith("FAILED"))
{
throw new ApplicationException(apiResult);
}
return apiResponse;
}
catch (OperationCanceledException)
{
logs.Enqueue($"OperationCanceledException: Site Delegate for {item}");
throw;
}
}
);
logs.Enqueue("Site: Finished all APIs");
}
static async Task<string> ProxyApiAsync(int taskId, IHttpClientFactory httpClientFactory, CancellationToken cancellationToken )
{
try
{
logs.Enqueue($"ProxyApiAsync: Calling external API for {taskId}");
var httpClient = httpClientFactory.CreateClient();
var response = await httpClient.GetAsync( "https://www.msn.com/", cancellationToken ); // Simulated call to external api
if (taskId == 3)
{
throw new ApplicationException($"Proxy Api - Fake Exception for {taskId}");
}
response.EnsureSuccessStatusCode();
var content = await response.Content.ReadAsStringAsync( cancellationToken );
return $"SUCCESS: Length={content.Length}";
}
catch (OperationCanceledException)
{
logs.Enqueue($"OperationCanceledException: ProxyApiAsync for {taskId}");
return "CANCELLED";
}
catch ( Exception ex )
{
logs.Enqueue($"ProxyApiAsync: {ex.GetType().Name}: {ex.Message}");
return $"FAILED: {ex.Message}";
}
}
static async Task<TResult[]> ForEachAsync<TSource, TResult>(
this TSource[] source,
ParallelOptions parallelOptions,
Func<TSource, CancellationToken, ValueTask<TResult>> body
)
{
TResult[] results = new TResult[source.Length];
await Parallel.ForEachAsync(
Enumerable.Range(0, source.Length),
parallelOptions,
async (i, ct) =>
{
try
{
results[i] = await body(source[i], ct); // .ConfigureAwait( false );
}
catch (OperationCanceledException)
{
logs.Enqueue($"OperationCanceledException: Site ForEachAsync Extension for {source[i]}");
throw;
}
});
return results;
}
}
Parallel.ForEachAsync
will cancel all remaining tasks in the event of an exception.
throw
is happening before the first await
, which probably means that none of the other tasks even started running.
await
had not been hit on the other tasks.await
.