Search code examples
c#async-awaittask-parallel-librarycancellation-token

Best way to handle Async/Await and cancellation to cancel all running tasks if any fail


I have the following architecture:

Web Site -calls-> Proxy Api -calls-> External Api

From the website, I have to make <= 25 calls from the web site to the proxy api. My goal is to call all apis and either:

  1. Get all results (currently using Task.WhenAll), or
  2. As soon as any of them fail, catch that exception and cancel all other tasks.

Website Code

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var apiTasks = apiInfos
        .Select( apiInfo => {
            // Simplified code...but return a call to Task<T> function *without* await
            return GetWebApiResultAsync( info, token );
        } );

    var xmlResults = await Task.WhenAll( apiTasks );

    foreach( var result in xmlResults )
    {
        // Process results
    }
}

async Task<XElement> GetWebApiResultAsync( ApiInfo info, CancellationToken ct = default )
{
    // simplified but essentially call SendAsync with some processing of info before and results after to return XElement.
    using var response = await httpClient.SendAsync( request, cancellationToken: cancellationToken );

    return new XElement( "ResultsGeneratedFromRESTCall" );

}

Proxy Api

app.MapGet("/call-external", (CancellationToken token) =>
{
    // Some code to prep to call external api, but looks the same as call above
    // but could throw exception
    using var externalResponse = await httpClient.SendAsync( httpRequest, token );

    externalResponse.EnsureSuccessStatusCode();

    var externalResult = ( await coverageResponse.Content.ReadFromJsonAsync<JsonObject>() )!;

    if ( externalResult[ "CustomCheck" ] != null )
    {
        throw new ApplicationException( "Custom failure from external source" );
    }

    // Otherwise generate results (could require making additional calls to external api)
});

I was going to change my pattern to this answer that implemented a custom ForEachAsync method. Would simply changing my function to look like this accomplish my two goals above (specifically throwing first exception it receives and cancelling all other requests)?

New WebSite Code with ForEachAsync

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    var xmlResults =
        await apiInfos.ForEachAsync( 
            new ParallelOptions { 
                MaxDegreeOfParallelism = apiInfos.Length, 
                CancellationToken = token
            },
            async ( apiInfo, ct ) => {
                // Now returning via await!
                return await GetWebApiResultAsync( info, ct );
            } );

    // remaining code unchanged
}

Update: Using CancellationTokenSource to Cancel Other Tasks

I updated my call to the following, but now the only exception I get 'at the end' is System.Threading.Tasks.TaskCanceledException instead of the actual Exception that was originally thrown. Is that expected? Is there a way to find the original exception?

async Task ProcessApiCallsAsync( ApiInfo[] apiInfos, CancellationToken token )
{
    using var cancelTokenSource = CancellationTokenSource.CreateLinkedTokenSource( token );

    var xmlResults =
        await apiInfos.ForEachAsync( 
            new ParallelOptions { 
                MaxDegreeOfParallelism = apiInfos.Length, 
                CancellationToken = cancelTokenSource.Token
            },
            async ( apiInfo, ct ) => {
                // Now returning via await!
                try
                {
                    if ( simulatedException )
                    {
                        await Task.Delay( 1500 );
                        throw new ApplicationException( "Emails endpoint is not supported" );
                    }

                    return await GetWebApiResultAsync( info, ct );
                }
                catch
                {
                    cancelTokenSource.Cancel();
                    throw;
                }
            } );

    // remaining code unchanged
}

Solution

  • Yes, what you are doing with the ForEachAsync of the other answer is correct, with the exception of the configuration MaxDegreeOfParallelism = apiInfos.Length, which is questionable. You generally do want to impose a limit to the concurrency.

    If you wanted to do the same with the simpler Task.WhenAll, you would do something like this:

    async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
    {
        using CancellationTokenSource linkedCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
    
        Task<XElement>[] apiTasks = apiInfos.Select(apiInfo =>
        {
            try { return GetWebApiResultAsync(info, linkedCts.Token); }
            catch { cts.Cancel(); throw; }
        }).ToArray();
    
        XElement[] xmlResults = await Task.WhenAll(apiTasks);
    
        //...
    }
    

    But using the Parallel.ForEachAsync is better because it allows to limit the concurrency, so that you don't bombard the remoter server with dozens of requests simultaneously.

    Btw in your case there is a simpler way to collect the results of the Parallel.ForEachAsync, because the source sequence is an array with known size:

    async Task ProcessApiCallsAsync(ApiInfo[] apiInfos, CancellationToken cancellationToken)
    {
        ParallelOptions options = new()
        { 
            MaxDegreeOfParallelism = 5, // A reasonable number
            CancellationToken = cancellationToken
        };
    
        XElement[] xmlResults = new XElement[apiInfos.Length];
    
        await Parallel.ForEachAsync(Enumerable.Range(0, apiInfos.Length), options, async (i, ct) =>
        {
            xmlResults[i] = await GetWebApiResultAsync(apiInfos[i], ct);
        });
    
        // Here all the slots of the xmlResults array are filled with data.
    }