Search code examples
c#asynchronousasync-awaittaskiasyncenumerable

Getting a 2nd `IAsyncEnumerator<>` from the same `IAsyncEnumerable<>` based on `Task.WhenEach<>` method


Given one IAsyncEnumerable<>, in general it works fine (even if it can be nonoptimal performance-wise) to call GetAsyncEnumerator more than once, getting a new IAsyncEnumerator<> each time. To give an explicit example (all code in this question is in C#), suppose I have this method:

static async IAsyncEnumerable<string> GetStr() {
    var item = await Task.FromResult("Bravo");
    yield return "Alfa";
    yield return item;
    yield return "Charlie";
}

then it works fine to do this:

IAsyncEnumerable<string> oneEnumerable = GetStr();
await foreach (var s in oneEnumerable) {
    Console.WriteLine(s);
}
Console.WriteLine("Between");
await foreach (var t in oneEnumerable) {
    Console.WriteLine(t);
}
Console.WriteLine("End");

If you want to be advanced, you can even have the two enumerators (from the same enumerable) live together, without disposing the first before you acquire the second one, say:

IAsyncEnumerable<string> oneEnumerable = GetStr();
IAsyncEnumerator<string> e = oneEnumerable.GetAsyncEnumerator();
IAsyncEnumerator<string> f = oneEnumerable.GetAsyncEnumerator();
bool c = await f.MoveNextAsync();
bool b = await e.MoveNextAsync();

Console.WriteLine($"b {b} with {e.Current}");
Console.WriteLine($"c {c} with {f.Current}");
await e.DisposeAsync();
await f.DisposeAsync();

The above code works the way you guys expect, the two enumerators are independent and do not mix up their states.


So this was just an introduction; I want to ask about a case where acquiring two enumerators from the same enumerable leads to a weird result. So now consider this method based on Task.WhenEach:

static IAsyncEnumerable<Task<int>> GetTasks() {
    IEnumerable<Task<int>> source = [Task.FromResult(7), Task.FromResult(9), Task.FromResult(13)];
    return Task.WhenEach(source);
}

and use this code:

IAsyncEnumerable<Task<int>> oneEnumerable = GetTasks();
await foreach (var t in oneEnumerable) {
    Console.WriteLine(t);
}
Console.WriteLine("Between");
await foreach (var u in oneEnumerable) {
    Console.WriteLine(u);
}
Console.WriteLine("End");

This runs without exception. BUT: The second enumerations gives zero items (body of foreach with u runs zero times)!


My questions:

  • Is this the expected behavior of Task.WhenEach?
  • Is this behavior documented?

I find it very error-prone. If, for some technical reason it is impossible to get more than one enumeration from an enumerable, it would be much nicer if the second call to GetAsyncEnumerator would throw an exception (or, alternatively, the first call to MoveNextAsync on the second enumerator would throw, or, awaiting the ValueTask<bool> produced by MoveNextAsync would throw).

(I was shown another thread where an IAsyncEnumerable<> produced by the GetRecordsAsync<> of a CsvReader had a similar issue.)


Solution

  • It doesn't appear to be documented yet, but the code comments explicitly call this out:

    // The enumerable could have GetAsyncEnumerator called on it multiple times.
    // As we're dealing with Tasks that only ever transition from non-completed
    // to completed, re-enumeration doesn't have much benefit, so we take advantage
    // of the optimizations possible by not supporting that and simply have the
    // semantics that, no matter how many times the enumerable is enumerated, every
    // task is yielded only once. The original GetAsyncEnumerator call will give back
    // all the tasks, and all subsequent iterations will be empty.
    if (waiter?.TryStart() is not true)
    {
        yield break;
    }
    

    Also:

    /// <summary>0 if this has never been used in an iteration; 1 if it has.</summary>
    /// <remarks>This is used to ensure we only ever iterate through the tasks once.</remarks>
    private int _enumerated;
    
    /// <summary>Called at the beginning of the iterator to assume ownership of the state.</summary>
    /// <returns>true if the caller owns the state; false if the caller should end immediately.</returns>
    public bool TryStart() => Interlocked.Exchange(ref _enumerated, 1) == 0;
    

    I agree that this appears non-obvious, and should be documented. Preferably it should allow multiple enumerations. I suggest you create a feature-request on GitHub for this.

    You may be able to write your own iterator to do this though. Example:

    public async static IAsyncEnumerable<T> WhenEach<T>(
        IEnumerable<T> tasks,
        [EnumeratorCancellation] CancellationToken cancellationToken = default) where T : Task
    {
        ArgumentNullException.ThrowIfNull(tasks);
        await foreach (T task in Task.WhenEach(tasks)
            .WithCancellation(cancellationToken).ConfigureAwait(false))
        {
            yield return task;
        }
    }
    

    C# iterator methods create enumerables that invoke the body of the method each time a new enumeration starts.