Search code examples
c#asynchronousasync-awaitcancellation-token

Site to Api to External Api question regarding Parallel.ForEachAsync and CancellationTokens


Update: 're-submitted' with hopefully easier code for responders to examine and run locally to reproduce the problem.

TLDR; I think my questions below stem from a lack of experience with multi-threaded async/await programming. I think the confusion all comes from the fact that there are multiple await statements throughout my code/workflow that take an indeterminant amount of time, leading to different timing/logging results/behaviors.

I have the following architecture.

Web Site Api Endpoints -calls-> Proxy Api -calls-> External Api

This question is a follow up to my question: Best way to handle Async/Await and cancellation to cancel all running tasks if any fail which I received great advice on a helper ForEachAsync helper method.

My outstanding issue is trying to understand CancellationToken and why I'm not receiving consistent OperationCanceledException exceptions thrown/logged where I thought they might otherwise have been.

I've written a .NET 7 console application that, to the best of my ability, simulates my architecture and the code can be found below.

My Questions

  1. If you uncomment throw new ApplicationException($"Site - Fake Exception for {item}"); and run the application over and over, there are a few notable issues.
    1. I never receive logs from Proxy Api, as if the request never got there before httpClient.SendAsync calls were cancelled. Why, does it take 'that long' to get a http request ready and the CancellationToken will already have been set to cancelled?
    2. I don't always receive 10 logs for 'Site -> Api Proxy' (the logs.Enqueue($"Site: Calling ProxyApiAsync for {item}"); line within the async delegate. Why?
  2. If you comment out throw new ApplicationException($"Site - Fake Exception for {item}"); and run the application over and over, there are a few notable issues.
    1. I successfully get 10 logs for both 'Site -> Api Proxy' and 'Api Proxy -> External', but I don't always get 9 'OperationCanceledException caught' logs in Site and Api Proxy. If I submitted 10 requests to the Api Proxy and 1 fails, shouldn't I always get 9 cancels? (writing this, I think I just discovered why, whichever 'missing cancels' exist represent Proxy->External that successfully finished before task 3 threw an exception?)
using System;
using System.Linq;
using System.Threading;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Net.Http;
using System.Threading.Tasks;
using System.Collections.Concurrent;
                    
public static class Program
{
    // Use this instead of Console.WriteLine() to enable some 'summary' queries after processing
    static ConcurrentQueue<string> logs = new ConcurrentQueue<string>();
    
    public static async Task Main()
    {
        logs = new ConcurrentQueue<string>();
        
        var builder = WebApplication.CreateBuilder();
        builder
            .Logging.SetMinimumLevel(LogLevel.None)
            .Services.AddHttpClient();
        var app = builder.Build();

        app.MapGet("/proxyapi/{taskId}", ProxyApiAsync); // simulated Proxy Api web api
        
        var minApiTask = app.RunAsync();
        await Task.Delay(100); // Wait for the API to start
        
        var httpClientFactory = app.Services.GetRequiredService<IHttpClientFactory>(); // simulated support for DI
        var apiInfosToRun = Enumerable.Range(0, 10).ToArray(); // simulated list of APIs to call

        try
        {
            await WebSiteEndpointAsync( apiInfosToRun, httpClientFactory );
        }
        catch (Exception ex)
        {
            logs.Enqueue($"Site: {ex.GetType().Name}: {ex.Message}");
        }

        await app.StopAsync();
        await minApiTask;

        Console.WriteLine($"{logs.Count(l => l.StartsWith("Site: Calling Proxy"))} calls from Site -> Api Proxy");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("ProxyApiAsync: Calling external"))} calls from Api Proxy -> External");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: ProxyApiAsync"))} OperationCanceledException caught in Proxy Api");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: Site Delegate"))} OperationCanceledException caught in Site Delegate");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("OperationCanceledException: Site ForEachAsync"))} OperationCanceledException caught in Site ForEachAsync Extension");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("ProxyApiAsync: ApplicationException"))} ApplicationException caught in Proxy Api");
        Console.WriteLine($"{logs.Count(l => l.StartsWith("Site: ApplicationException"))} ApplicationException caught in Site");
        Console.WriteLine("");

        foreach (var log in logs)
        {
            Console.WriteLine(log);
        }
    }

    static async Task WebSiteEndpointAsync( int[] apiInfosToRun, IHttpClientFactory httpClientFactory )
    {
        logs.Enqueue($"Site: {apiInfosToRun.Length} APIs to run");

        var apiResponses = await apiInfosToRun.ForEachAsync(
            new ParallelOptions { MaxDegreeOfParallelism = Int32.MaxValue },
            async (item, ct) =>
            {
                try
                {
                    logs.Enqueue($"Site: Calling ProxyApiAsync for {item}");

                    if (item == 3)
                    {
                        // throw new ApplicationException($"Site - Fake Exception for {item}");
                    }

                    using var client = httpClientFactory.CreateClient();
                    using var request = new HttpRequestMessage
                    {
                        Method = HttpMethod.Get,
                        RequestUri = new Uri($"http://localhost:5000/proxyapi/{item}"),
                    };
                    var apiResponse = await client.SendAsync(request, ct);

                    apiResponse.EnsureSuccessStatusCode();

                    var apiResult = await apiResponse.Content.ReadAsStringAsync(ct);

                    if (apiResult.StartsWith("FAILED"))
                    {
                        throw new ApplicationException(apiResult);
                    }

                    return apiResponse;
                }
                catch (OperationCanceledException)
                {
                    logs.Enqueue($"OperationCanceledException: Site Delegate for {item}");
                    throw;
                }
            }
        );

        logs.Enqueue("Site: Finished all APIs");

    }
    
    static async Task<string> ProxyApiAsync(int taskId, IHttpClientFactory httpClientFactory, CancellationToken cancellationToken )
    {
        try
        {
            logs.Enqueue($"ProxyApiAsync: Calling external API for {taskId}");
            
            var httpClient = httpClientFactory.CreateClient();
            var response = await httpClient.GetAsync( "https://www.msn.com/", cancellationToken ); // Simulated call to external api

            if (taskId == 3)
            {
                throw new ApplicationException($"Proxy Api - Fake Exception for {taskId}");
            }

            response.EnsureSuccessStatusCode();
            
            var content = await response.Content.ReadAsStringAsync( cancellationToken );
            
            return $"SUCCESS: Length={content.Length}";
        }
        catch (OperationCanceledException)
        {
            logs.Enqueue($"OperationCanceledException: ProxyApiAsync for {taskId}");
            return "CANCELLED";
        }
        catch ( Exception ex )
        {
            logs.Enqueue($"ProxyApiAsync: {ex.GetType().Name}: {ex.Message}");
            return $"FAILED: {ex.Message}";
        }
    }

    static async Task<TResult[]> ForEachAsync<TSource, TResult>(
        this TSource[] source,
        ParallelOptions parallelOptions,
         Func<TSource, CancellationToken, ValueTask<TResult>> body
    )
    {
        TResult[] results = new TResult[source.Length];

        await Parallel.ForEachAsync(
            Enumerable.Range(0, source.Length),
            parallelOptions,
            async (i, ct) =>
            {
                try
                {
                    results[i] = await body(source[i], ct); // .ConfigureAwait( false );
                }
                catch (OperationCanceledException)
                {
                    logs.Enqueue($"OperationCanceledException: Site ForEachAsync Extension for {source[i]}");
                    throw;
                }
            });

        return results;
    }
}

Solution

  • Parallel.ForEachAsync will cancel all remaining tasks in the event of an exception.

    1. The throw is happening before the first await, which probably means that none of the other tasks even started running.
      1. It doesn't take that long, they just never even began running because the first await had not been hit on the other tasks.
      2. Same reason.
    2. Yes correct, the task that is excepting cancels the whole operation before some of them even began, or they have already completed. In this version, the exception happens after the first await.