I have a method that accepts an IAsyncEnumerable
as argument, and returns also an IAsyncEnumerable
. It calls a web method for each item in the input stream, and propagates the result to the output stream. My question is how can I be notified if the caller of my method has stopped enumerating the output stream, so I can stop enumerating the input stream inside my method? It seems that I should be able to be notified because the caller disposes by default the IAsyncEnumerator
that gets from my method. Is there any build-in mechanism that generates such a notification for compiler-generated async methods? If not, what is the easiest to implement alternative?
Example. The web method validates if an url is valid or not. There is a never ending stream of urls provided, but the caller stops enumerating the results when more than 2 invalid urls are found:
var invalidCount = 0;
await foreach (var result in ValidateUrls(GetMockUrls()))
{
Console.WriteLine($"Url {result.Url} is "
+ (result.IsValid ? "OK" : "Invalid!"));
if (!result.IsValid) invalidCount++;
if (invalidCount > 2) break;
}
Console.WriteLine($"--Async enumeration finished--");
await Task.Delay(2000);
The generator of the urls. One url is generated every 300 msec.
private static async IAsyncEnumerable<string> GetMockUrls()
{
int index = 0;
while (true)
{
await Task.Delay(300);
yield return $"https://mock.com/{++index:0000}";
}
}
The validator of the urls. There is a requirement that the input stream is enumerated eagerly, so two asynchronous workflows are running in parallel. The first workflow inserts the urls in a queue, and the second workflow picks the urls one by one and validates them. A BufferBlock
is used as async queue.
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
Clarification: the queue is mandatory, and removing it is not an option. It is an essential component of this problem.
The validator of a single url. The validation process lasts 300 msec on average.
private static Random _random = new Random();
private static async Task<bool> MockValidateUrl(string url)
{
await Task.Delay(_random.Next(100, 600));
return _random.Next(0, 2) != 0;
}
Output:
Url https://mock.com/0001 received
Url https://mock.com/0001 is Invalid!
Url https://mock.com/0002 received
Url https://mock.com/0003 received
Url https://mock.com/0002 is OK
Url https://mock.com/0004 received
Url https://mock.com/0003 is Invalid!
Url https://mock.com/0005 received
Url https://mock.com/0004 is OK
Url https://mock.com/0005 is OK
Url https://mock.com/0006 received
Url https://mock.com/0006 is Invalid!
--Async enumeration finished--
Url https://mock.com/0007 received
Url https://mock.com/0008 received
Url https://mock.com/0009 received
Url https://mock.com/0010 received
Url https://mock.com/0011 received
Url https://mock.com/0012 received
...
The problem is that urls are still generated and received after the caller/client has finished the asynchronous enumeration. I would like to fix this, so that no more messages appear in the console after --Async enumeration finished--
.
I suppose I should answer my own question, since I now have a simple enough generic solution.
Update: I am scraping my previous answer because I discovered a much easier solution. It is embarassingly simple actually. All I have to do is to enclose the yielding part of the ValidateUrls
iterator into a try-finally
block. The finally
block will be executed on every case, either by the caller completing normally the enumeration, or abnormally by a break
or an exception. So this is how I can get the notification I am looking for, by cancelling a CancellationTokenSource
on finally
:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls)
{
var buffer = new System.Threading.Tasks.Dataflow.BufferBlock<string>();
var completionCTS = new CancellationTokenSource();
_ = Task.Run(async () =>
{
await foreach (var url in urls)
{
if (completionCTS.IsCancellationRequested) break;
Console.WriteLine($"Url {url} received");
await buffer.SendAsync(url);
}
buffer.Complete();
});
try
{
while (await buffer.OutputAvailableAsync() && buffer.TryReceive(out var url))
{
yield return (url, await MockValidateUrl(url));
}
}
finally // This runs when the caller completes the enumeration
{
completionCTS.Cancel();
}
}
I should probably note that an async iterator that doesn't support cancellation is not a good practice. Without it the caller has no easy way to stop the awaiting between the consumption of one value and the next. So a better signature for my method should be:
private static async IAsyncEnumerable<(string Url, bool IsValid)> ValidateUrls(
this IAsyncEnumerable<string> urls,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
The token could then be passed to the awaited methods of the yielding loop, the OutputAvailableAsync
and the MockValidateUrl
.
From the caller's perspective, the token can be passed either directly, or by chaining the extension method WithCancellation
.
await foreach (var result in ValidateUrls(GetMockUrls()).WithCancellation(token))