It appears that disposing a generated IAsyncEnumerable
's IAsyncEnumerator
does not trigger the cancellation token of the underlying IAsyncEnumerable
.
Specifically, this unit test fails its only assertion:
[Test]
public async Task test()
{
await foreach (var value in TestData())
break;
}
private static async IAsyncEnumerable<int> TestData([EnumeratorCancellation] CancellationToken ct = default)
{
try
{
yield return 1;
await Task.CompletedTask;
yield return 2;
}
finally
{
Assert.That(ct.IsCancellationRequested, Is.True);
}
}
I'm extremely surprised by this behaviour. Is there any standard approach to have TestData's CancellationToken cancelled in this scenario? I could implement a custom operator, but that feels extremely clunky, so I'm hoping that there's some better way of doing this.
[Test]
public async Task this_test_passes()
{
await foreach (var value in CancelWhenUnsubscribed(TestData()))
break;
}
public static async IAsyncEnumerable<T> CancelWhenUnsubscribed<T>(IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken ct = default)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
await using var en = source.GetAsyncEnumerator(cts.Token);
using var _ = Disposable.Create(() => cts.Cancel());
while (await en.MoveNextAsync())
yield return en.Current;
}
For context why I'm trying to do this: I am using System.Interactive.Async to merge a few IAsyncEnumerable sequences.
Its implementation works in a way that when the merged sequence is disposed, it first waits for any in-flight MoveNext
tasks to complete, and only then disposes and terminates the merged sequence.
In my specific case, that means I need to cancel the underlying IAsyncEnumerables when the merged one is terminated, otherwise I'll run into quasi-deadlocks. As a simple example, this test here gets stuck:
[Test]
public async Task this_test_gets_stuck()
{
await foreach (var value in AsyncEnumerableEx.Merge([TestData(), TestData()]))
break;
}
private static async IAsyncEnumerable<int> TestData([EnumeratorCancellation] CancellationToken ct = default)
{
yield return 1;
await Task.Delay(TimeSpan.FromHours(100), ct);
yield return 2;
}
I don't get why you are surprised. The TestData
iterator doesn't own the CancellationToken
, so it's not its responsibility to cancel it, even if it could. Actually it can't cancel it, for reasons discussed in this question:
If you invoke the TestData
without passing a CancellationToken
, and without using the WithCancellation
extension method, the ct.CanBeCanceled
will be false
. In which case it would be absurd to expect the automatic cancellation of a token that cannot be canceled.