Search code examples
c#async-awaitcancellation.net-5iasyncenumerable

What is the correct way to pass a cancellation token to an async stream?


For a while I've been trying to get my head around the whole async/await model that C# uses for asynchronous code. The addition of async streams (the IAsyncEnumerable<T> type) seemed really cool, especially for some code that I was writing.

Best practice when creating an async method is to include a CancellationToken parameter and use it for cancelling your async processes. (Ideally by passing it to the underlying async method calls used in your method.)

When creating a method that returns an async stream (an IAsyncEnumerable<T>) the documentation states that your CancellationToken parameter should be decorated with the [EnumeratorCancellation] attribute and then the token passed using the .WithCancellation() method on the IAsyncEnumerable<T> itself.

However, I must be doing something wrong because this still triggers warning:

CA2016: Forward the CancellationToken parameter to methods that take one

This warning appears regardless of if I do it the more standard way:

async IAsyncEnumerable<aThingo> GetFlibbityStream([EnumeratorCancellation] CancellationToken cancellationToken = default) {
    aThingo slowValue = null;
    do {
        aThingo slowValue = await GetThatThingo(cancellationToken);
        yield return slowValue;
    while (slowValue != null);
}


async Task DoingStuff(CancellationToken cancellationToken) {
    await foreach(var thng in ThingStreamCreator.GetFlibbityStream().WithCancellation(cancellationToken)) {
        CrushThatThing(thng);
    }
}

Or at points where I need to get the AsyncEnumerator itself (because I need to iterate through two async streams together, but not necessarily at the same rate.)

async Task ValidatingThingsAsync(CancellationToken cancellationToken) {
    await using IAsyncEnumerator<aThingo> srcEnumerator = source.ThingValidityAsyncStream(dateCutOff).GetAsyncEnumerator(cancellationToken);
    ... streamy stuff ....
}

All of my async stream methods have a default value for the CancellationToken which makes them optional parameters. I think perhaps part of my problem is the WithCancellation() method is intended for use cases where you already have the IAsyncStream<T> but didn't necessarily pass the cancellation token to it. But that doesn't entirely make sense and it feels like I am either passing in the cancellation token too often or not enough (or doing the wrong one of those when I should be doing the other.)

Am I simply misusing WithCancellation() and GetAsyncEnumerator() by unnecessarily passing the cancellation token when I should simply be passing it to the async stream method directly in these cases?

Basically I should not be using WithCancellation() and I shouldn't be passing anything to GetAsyncEnumerator() and should instead remove the default value for the CancellationToken on my async stream methods and pass the token directly to them. Basically I think I'm confused by the number of different ways to pass a CancellationToken to an async stream and identifying which is the correct method to use at the time...


Solution

  • According to the specification:

    There are two main consumption scenarios:

    1. await foreach (var i in GetData(token)) ... where the consumer calls the async-iterator method,
    2. await foreach (var i in givenIAsyncEnumerable.WithCancellation(token)) ... where the consumer deals with a given IAsyncEnumerable instance.

    You're calling GetFlibbityStream method, so this is the case #1. You should pass CancellationToken directly to the method and should not chain GetFlibbityStream with WithCancellation. Otherwise rule analyzer for CA2016 will emit warning, and it will be right.

    WithCancellation is intended for the case #2. For example, there is some library type with property or method, which returns IAsyncEnumerable<T> and does not allow to pass CancellationToken directly.

    Like this one:

    public interface IFlibbityService
    {
        IAsyncEnumerable<aThingo> FlibbityStream { get; }
    }
    

    This still supports cancellation, but the only way to pass token to IFlibbityService.FlibbityStream is to use WithCancellation:

    await foreach(var thng in flibbityService.FlibbityStream.WithCancellation(cancellationToken))
    {
        // ...
    }
    

    Back to your code, just throw away WithCancellation and pass token directly:

    await foreach(var thng in ThingStreamCreator.GetFlibbityStream(cancellationToken))
    {
    }