Search code examples
c#asynchronous.net-6.0parallel.foreachasync

How to break the Parallel.ForEachAsync loop, not cancel it?


In .NET 5 we had Parallel.ForEach which you were able to use ParallelLoopState.Break() method to stop additional iterations from processing. Allowing current ones to complete processing.

But the new .NET 6 Parallel.ForEachAsync does not have the ParallelLoopState class so we can't break it like we could with Parallel.ForEach. So is there a way to perform the same break functionality in ForEachAsync? CancellationToken passed to the func I don't believe is the right way since your not trying to cancel the running loop but preventing additional iterations from starting.

Something like this functionality but for the async version:

int count = 0;
Parallel.ForEach(enumerateFiles, new ParallelOptions() { CancellationToken = cancellationToken},
    (file, state) =>
    {
        Interlocked.Increment(ref count);
        if (count >= MaxFilesToProcess)
        {
            state.Break();
        }
...

As a workaround I can probably use .Take([xx]) on the TSource before it is passed into the parallel loop but that might not be an option for a complex condition to break on.


Solution

  • The asynchronous API Parallel.ForEachAsync does not offer the Stop/Break functionality of its synchronous counterpart.

    One way to replicate this functionality is to use a bool flag in combination with the TakeWhile LINQ operator:

    bool breakFlag = false;
    await Parallel.ForEachAsync(
        source.TakeWhile(_ => !Volatile.Read(ref breakFlag)),
        async (item, ct) =>
    {
        // ...
        if (condition) Volatile.Write(ref breakFlag, true);
        // ...
    });
    

    The Parallel.ForEachAsync does not buffer aggressively elements from the source sequence¹, like the Parallel.ForEach does, so as soon as the condition is met, no more asynchronous operations are going to start.

    Is case the source is an asynchronous enumerable (IAsyncEnumerable<T>), there is a compatible TakeWhile operator with identical functionality in the System.Linq.Async package.

    ¹ At least not today (.NET 6). This behavior is neither documented nor guaranteed.