I have an async iterator method that produces an IAsyncEnumerable<int>
(a stream of numbers), one number every 200 msec. The caller of this method consumes the stream, but wants to stop the enumeration after 1000 msec. So a CancellationTokenSource
is used, and the token is passed as
an argument to the WithCancellation
extension method. But the token is not respected. The enumeration continues until all the numbers are consumed:
static async IAsyncEnumerable<int> GetSequence()
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(200);
yield return i;
}
}
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence().WithCancellation(cts.Token))
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
Output:
12:55:17.506 > 1
12:55:17.739 > 2
12:55:17.941 > 3
12:55:18.155 > 4
12:55:18.367 > 5
12:55:18.570 > 6
12:55:18.772 > 7
12:55:18.973 > 8
12:55:19.174 > 9
12:55:19.376 > 10
The expected output is a TaskCanceledException
to occur after number 5. It seems that I have misunderstood what the WithCancellation
is actually doing. The method just passes the supplied token to the iterator method, if that method accepts one. Otherwise, like with the method GetSequence()
in my example, the token is ignored. I suppose that the solution in my case is to interrogate manually the token inside the body of the enumeration:
var cts = new CancellationTokenSource(1000);
await foreach (var i in GetSequence())
{
cts.Token.ThrowIfCancellationRequested();
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > {i}");
}
This is simple and works well. But in any case I wonder if it would be possible to create an extension method that does what I expected the WithCancellation
to do, to bake the token inside the ensuing enumeration. This is the signature of the needed method:
public static IAsyncEnumerable<T> WithEnforcedCancellation<T>(
this IAsyncEnumerable<T> source, CancellationToken cancellationToken)
{
// Is it possible?
}
IAsyncEnumerable
explicitly provides for this mechanism with the EnumeratorCancellation
attribute:
static async IAsyncEnumerable<int> GetSequence([EnumeratorCancellation] CancellationToken ct = default) {
for (int i = 1; i <= 10; i++) {
ct.ThrowIfCancellationRequested();
await Task.Delay(200); // or `Task.Delay(200, ct)` if this wasn't an example
yield return i;
}
}
In fact, the compiler is helpful enough to issue a warning if you give the method a CancellationToken
parameter, but do not add the attribute.
Note that the token passed to .WithCancellation
will override any local token passed to the method. The specs have the details on this.
Of course, this will still only work if the enumeration actually accepts a CancellationToken
-- but the fact that cancellation only really works if done cooperatively is true of any async
work. Yeldar's answer is good for "forcing" some measure of cancellation into an enumerable that doesn't support it, but the preferred solution should be to modify the enumeration to support cancellation by itself -- the compiler does everything to help you out.