Search code examples
c#asynchronousparallel.foreachcancellationparallel.foreachasync

Stop Parallel.ForEachAsync


In C#, I am interested in stopping a Parallel.ForEachAsync loop (considering the differences between Stop and Break); for Parallel.ForEach I can do the following:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});

However, since I have a process that needs to be executed asynchronously, I switched to Parallel.ForEachAsync. The ForEachAsync does not have the Stop() method, I'm able to break the loop as the following, but I'm wondering if this is the most effective way of breaking the loop (in other words, the loop needs to stop ASAP when it receives the cancellation request).

await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});

Solution

  • The Parallel.ForEachAsync body delegate has a CancellationToken as its second parameter. This token is supplied by the API, it's not the same token that you have passed in the ParallelOptions. You can forward this token to any asynchronous method that you invoke inside the lambda. If you invoke non-cancelable methods, then the best you can do is to call the ThrowIfCancellationRequested at strategic places inside the lambda:

    CancellationTokenSource cts = new();
    ParallelOptions options = new() { CancellationToken = cts.Token };
    
    try
    {
        await Parallel.ForEachAsync(items, options, async (item, ct) =>
        {
            //...
            ct.ThrowIfCancellationRequested();
            //...
            await ProcessAsync(item, ct);
            //...
            ct.ThrowIfCancellationRequested();
            //...
        });
    }
    catch (OperationCanceledException)
    {
        // ... (the cts was canceled)
    }
    

    The token provided as argument in the lambda, the ct in the above example, is canceled not only when the ParallelOptions.CancellationToken is canceled, but also in case a ProcessAsync operation has failed. This mechanism allows faster propagation of exceptions. The parallel loop does not complete immediately when an error occurs, because it follows the principle of disallowing fire-and-forget operations. All operations that are started internally by the loop, must be completed before the whole loop completes either successfully or with failure. The token in the lambda makes it possible to reduce this latency to a minimum.