Search code examples
c#async-awaitpollyretry-logicbulkhead

How to execute a Polly policy for all elements of an IEnumerable, and stop on first unhandled exception


The policies of the Polly library, for example Bulkhead, Retry etc, contain a method ExecuteAsync with many overloads (18), but none of them allows to execute the policy for all elements of an IEnumerable and gather the results. It seems that the whole library is focused on the goal of executing a single action, leaving the responsibility of managing multiple executions to the client code. I would like to fix this omission by implementing an extension method for all Polly policies (all implementations of the IAsyncPolicy interface), with the signature below:

public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
    this IAsyncPolicy policy,
    IEnumerable<TSource> source,
    Func<TSource, Task<TResult>> action,
    bool continueOnCapturedContext = false,
    bool onErrorContinue = false)

The continueOnCapturedContext parameter controls whether to continue on the captured synchronization context, and should just be passed to the native ExecuteAsync method:

Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
    Func<CancellationToken, Task<TResult>> action,
    CancellationToken cancellationToken,
    bool continueOnCapturedContext);

The onErrorContinue parameter is the most important aspect of this question, since it controls the behavior in case of a policy failure. My intention is to use this extension method with thousands of elements, and in case of any exceptions that are not expected/handled by my policy¹ I would like to terminate the whole execution promptly and gracefully. In case the argument onErrorContinue has the default value false, the first unhandled exception should cause the cancellation of all pending operations, and the whole execution should terminate as soon as all started operations have completed. In the opposite case of onErrorContinue: true, all elements should be processed by the policy. Finally all exceptions should be propagated, bundled in an AggregateException, independently of the onErrorContinue value.

How could I implement this extension method?

Hypothetical usage scenario of this method:

var policy = Policy
    .BulkheadAsync(maxParallelization: 10, maxQueuingActions: Int32.MaxValue)
    .WrapAsync(Policy
        .Handle<HttpRequestException>()
        .WaitAndRetryAsync(retryCount: 3,
            sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
    );

var urls = Enumerable.Range(1, 1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls, async url =>
{
    await Task.Delay(500); // Simulate a web request
    lock (random) if (random.NextDouble() < 0.66)
        throw new HttpRequestException($"Url #{url} failed");
    return url;
}, onErrorContinue: false);

¹ This should happen rarely in production, but may happen frequently during the development, and could hurt productivity.


Solution

  • Here is my implementation of the ExecuteAsync method. A CancellationTokenSource is used for cancelling the pending operations in case of an exception. The Task.WhenAll does a good job at ignoring the OperationCanceledExceptions, when there are more important exceptions to propagate. Finally the Task.WhenAll task is returned without being awaited, in order to preserve all exceptions.

    public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
        this IAsyncPolicy policy,
        IEnumerable<TSource> source,
        Func<TSource, Task<TResult>> action,
        bool continueOnCapturedContext = false,
        bool onErrorContinue = false)
    {
        // Arguments validation omitted
        var cts = new CancellationTokenSource();
        var token = !onErrorContinue ? cts.Token : default;
        var tasks = source.Select(async (item) =>
        {
            try
            {
                return await policy.ExecuteAsync(async _ =>
                {
                    return await action(item);
                }, token, continueOnCapturedContext);
            }
            catch
            {
                if (!onErrorContinue) cts.Cancel();
                throw;
            }
        }).ToArray();
        var whenAll = Task.WhenAll(tasks);
        _ = whenAll.ContinueWith(_ => cts.Dispose(), TaskScheduler.Default);
        return whenAll;
    }
    

    Emulating the Task.WhenAll behavior, which is desirable in this case, is quite tricky otherwise (with async/await). So I am happily avoiding this trouble by using a small and dirty ContinueWith, in order to finally dispose the CancellationTokenSource.

    An alternative way of dealing with multiple exceptions is presented here. This solution propagates a nested AggregateException, which sounds ugly, but in practice it's OK because awaiting an asynchronous method eliminates one level of nesting anyway.