Search code examples
c#async-awaittpl-dataflowc#-8.0iasyncenumerable

How to stop propagating an asynchronous stream (IAsyncEnumerable)


I have a method that accepts an IAsyncEnumerable as argument, and returns also an IAsyncEnumerable. It calls a web method for each item in the input stream, and propagates the result to the output stream. My question is how can I be notified if the caller of my method has stopped enumerating the output stream, so I can stop enumerating the input stream inside my method? It seems that I should be able to be notified because the caller disposes by default the IAsyncEnumerator that gets from my method. Is there any build-in mechanism that generates such a notification for compiler-generated async methods? If not, what is the easiest to implement alternative?

Example. The web method validates if an url is valid or not. There is a never ending stream of urls provided, but the caller stops enumerating the results when more than 2 invalid urls are found:

var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
    Console.WriteLine($"Url {result.Url} is "
        + (result.IsValid ? "OK" : "Invalid!"));
    if (!result.IsValid) invalidCount++;
    if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);

The generator of the urls. One url is generated every 300 msec.

private static async IAsyncEnumerable<string> GetMockUrls()
{
    int index = 0;
    while (true)
    {
        await Task.Delay(300);
        yield return $"https://mock.com/{++index:0000}";
    }
}

The validator of the urls. There is a requirement that the input stream is enumerated eagerly, so two asynchronous workflows are running in parallel. The first workflow inserts the urls in a queue, and the second workflow picks the urls one by one and validates them. A BufferBlock is used as async queue.

private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
    this IAsyncEnumerable<string> urls)
{
    var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
    _ = Task.Run(async () =>
    {
        await foreach (var url in urls)
        {
            Console.WriteLine($"Url {url} received");
            await buffer.SendAsync(url);
        }
        buffer.Complete();
    });

    while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
    {
        yield return (url, await MockValidateUrl(url));
    }
}

Clarification: the queue is mandatory, and removing it is not an option. It is an essential component of this problem.

The validator of a single url. The validation process lasts 300 msec on average.

private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
    await Task.Delay(_random.Next(100, 600));
    return _random.Next(0, 2) != 0;
}

Output:

Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...

The problem is that urls are still generated and received after the caller/client has finished the asynchronous enumeration. I would like to fix this, so that no more messages appear in the console after --Async enumeration finished--.


Solution

  • I suppose I should answer my own question, since I now have a simple enough generic solution.

    Update: I am scraping my previous answer because I discovered a much easier solution. It is embarassingly simple actually. All I have to do is to enclose the yielding part of the ValidateUrls iterator into a try-finally block. The finally block will be executed on every case, either by the caller completing normally the enumeration, or abnormally by a break or an exception. So this is how I can get the notification I am looking for, by cancelling a CancellationTokenSource on finally:

    private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        this IAsyncEnumerable<string> urls)
    {
        var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
        var completionCTS = new CancellationTokenSource();
        _ = Task.Run(async () =>
        {
            await foreach (var url in urls)
            {
                if (completionCTS.IsCancellationRequested) break;
                Console.WriteLine($"Url {url} received");
                await buffer.SendAsync(url);
            }
            buffer.Complete();
        });
    
        try
        {
            while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
            {
                yield return (url, await MockValidateUrl(url));
            }
        }
        finally // This runs when the caller completes the enumeration
        {
            completionCTS.Cancel();
        }
    }
    

    I should probably note that an async iterator that doesn't support cancellation is not a good practice. Without it the caller has no easy way to stop the awaiting between the consumption of one value and the next. So a better signature for my method should be:

    private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
        this IAsyncEnumerable<string> urls,
        [EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
    

    The token could then be passed to the awaited methods of the yielding loop, the OutputAvailableAsync and the MockValidateUrl.

    From the caller's perspective, the token can be passed either directly, or by chaining the extension method WithCancellation.

    await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))