Search code examples
c#.net-coreparallel.foreachiasyncenumerableparallel.foreachasync

From IEnumerable<Task<T>> to IAsyncEnumerable<T> by yield returning inside a Parallel.ForEach/Parallel.ForEachAsync gives error CS1621


In a .NET 6 project, I have to call a web API which is offset paginated (page/per page) and I would like to make the n calls parallel as far as possible.

This is the method which calls the API one time with the given page number:

private Task<ApiResponse> CallApiAsync(int page,
    CancellationToken cancellationToken = default)
{
    return GetFromJsonAsync<ApiResponse>($"...&page={page}", cancellationToken)
        .ConfigureAwait(false);
}

What I actually need is a forward only streamable iterator of all the API calls from page 1 to page n, so given this requirement, I thought IAsyncEnumerable was the right API to use so I could fire the API calls in parallel and access each API response as soon as one was ready, without needing all of them to be finished.

So I came up with the following code:

public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    int numProducts = GetNumberOfProducts(perPage);

    int numCalls = MathExtensions.CeilDiv(numProducts, perPage);

    var pages = Enumerable.Range(1, numCalls);

    Parallel.ForEach(pages, async page => {
        yield return await CallApiAsync(page, cancellationToken).ConfigureAwait(false);
    });

    yield break;
}

But I get the following error at yield: CS1621 - The yield statement cannot be used inside an anonymous method or lambda expression.
Is there a way to achieve the result I would like to get?
Feel free to ask questions if I wasn't clear enough!


Solution

  • The most readily available tool that you can use for this purpose is a TransformBlock<TInput,TOutput> from the TPL Dataflow library. This component is natively available in .NET Core and later, and it is essentially a processor/projector/transformer with two queues (input and output). You specify the processing function, then you configure the options according to your needs, then you feed it with data, and finally you retrieve the processed output:

    public async IAsyncEnumerable<ApiResponse> CallApiEnumerableAsync(int perPage,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        int numProducts = GetNumberOfProducts(perPage);
        int numCalls = MathExtensions.CeilDiv(numProducts, perPage);
        var pages = Enumerable.Range(1, numCalls);
    
        TransformBlock<int, ApiResponse> block = new(async page =>
        {
            return await CallApiAsync(page, cancellationToken);
        }, new ExecutionDataflowBlockOptions()
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 10, // Configurable, the default is 1
            EnsureOrdered = true, // This is the default
        });
    
        // Feed the block with input data
        foreach (var page in pages) block.Post(page);
        block.Complete();
    
        // Emit the output data as they become available
        while (await block.OutputAvailableAsync())
            while (block.TryReceive(out var item))
                yield return item;
    
        // Propagate possible exception (including cancellation)
        await block.Completion;
    }
    

    This simple implementation initiates the TransformBlock when the resulting IAsyncEnumerable<ApiResponse> is enumerated, and it won't stop until either all the processing is completed, or the cancellationToken is canceled. The processing is not driven by the enumeration of the resulting sequence. It won't even stop if the client code simply abandons the enumeration, by breaking the await foreach loop. If you want to include this functionality (graceful termination), you will have to add a try-finally block, and an internal linked CancellationTokenSource as shown here. The yielding loop should be placed inside the try, and the cancellation of the linked CancellationTokenSource should be placed inside the finally.