Search code examples
c#cancellationcancellation-tokeniasyncenumerable

Disposing an IAsyncEnumerator does not cancel the underlying IAsyncEnumerable's cancellation token


It appears that disposing a generated IAsyncEnumerable's IAsyncEnumerator does not trigger the cancellation token of the underlying IAsyncEnumerable. Specifically, this unit test fails its only assertion:

[Test]
public async Task test()
{
    await foreach (var value in TestData())
        break;
}

private static async IAsyncEnumerable<int> TestData([EnumeratorCancellation] CancellationToken ct = default)
{
    try
    {
        yield return 1;
        await Task.CompletedTask;
        yield return 2;
    }
    finally
    {
        Assert.That(ct.IsCancellationRequested, Is.True);
    }
}

I'm extremely surprised by this behaviour. Is there any standard approach to have TestData's CancellationToken cancelled in this scenario? I could implement a custom operator, but that feels extremely clunky, so I'm hoping that there's some better way of doing this.

[Test]
public async Task this_test_passes()
{
    await foreach (var value in CancelWhenUnsubscribed(TestData()))
        break;
}

public static async IAsyncEnumerable<T> CancelWhenUnsubscribed<T>(IAsyncEnumerable<T> source, [EnumeratorCancellation] CancellationToken ct = default)
{
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    await using var en = source.GetAsyncEnumerator(cts.Token);
    using var _ = Disposable.Create(() => cts.Cancel());
    while (await en.MoveNextAsync())
        yield return en.Current;
}

For context why I'm trying to do this: I am using System.Interactive.Async to merge a few IAsyncEnumerable sequences.

Its implementation works in a way that when the merged sequence is disposed, it first waits for any in-flight MoveNext tasks to complete, and only then disposes and terminates the merged sequence.

In my specific case, that means I need to cancel the underlying IAsyncEnumerables when the merged one is terminated, otherwise I'll run into quasi-deadlocks. As a simple example, this test here gets stuck:

[Test]
public async Task this_test_gets_stuck()
{
    await foreach (var value in AsyncEnumerableEx.Merge([TestData(), TestData()]))
        break;
}

private static async IAsyncEnumerable<int> TestData([EnumeratorCancellation] CancellationToken ct = default)
{
    yield return 1;
    await Task.Delay(TimeSpan.FromHours(100), ct);
    yield return 2;
}

Solution

  • I don't get why you are surprised. The TestData iterator doesn't own the CancellationToken, so it's not its responsibility to cancel it, even if it could. Actually it can't cancel it, for reasons discussed in this question:

    If you invoke the TestData without passing a CancellationToken, and without using the WithCancellation extension method, the ct.CanBeCanceled will be false. In which case it would be absurd to expect the automatic cancellation of a token that cannot be canceled.