Search code examples
c#.netasync-await.net-6.0iasyncenumerable

How is the Take() method working with IAsyncEnumerable


I have been recently playing around with IAsyncEnumerables and there is a certain thing that I can't understand how it works behind the scenes.

Consider the following piece of code:

private async Task Test()
{
    var t = new List<int>();

    await foreach (var number in Numbers().Take(5).ConfigureAwait(false))
    {
        t.Add(number);
    }
}

int[] myIntArray = new int[10] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};

private async IAsyncEnumerable<int> Numbers()
{
    foreach (var element in myIntArray)
    {
        yield return element;
    }
}

So what bothers me is, how does the IAsyncEnumerable method knows how to only execute 5 times, or in other words, how does it translate the .Take(5) method in such way that it can take the number passed into account when executing?


Solution

  • IAsyncEnumerable<T> is a simple interface with a single method defined - IAsyncEnumerable<T>.GetAsyncEnumerator(CancellationToken) which returns IAsyncEnumerator<T> which is not that much more complicated - it has MoveNextAsync method (advances the enumerator asynchronously to the next element of the collection) and Current property (gets the element in the collection at the current position of the enumerator), which is similar to "ordinary" IEnumerable<T>. In the nutshell all LINQ methods will operate with those two (if we don't take into account some optimizations for some corner cases).

    There is no framework-provided LINQ for async enumerables AFAIK. If you are using one from the System.Linq.Async from reactive extensions then it is implemented basically the same as the "ordinary" Take, which wraps the source enumerable into special "partitioner" class with overloaded MoveNext (source code). In case of async enumerables it is AsyncEnumerablePartition (source code) which on materialization will basically call await MoveNextAsync(...) until the requested limit is reached (i.e. passed to the Take) and then will stop the execution. A simplified version can look something like the following:

    public static class Exts
    {
        public static async IAsyncEnumerable<int> Take(this IAsyncEnumerable<int> source, int n)
        {
            var enumerator = source.GetAsyncEnumerator();
            while (n > 0)
            {
                if (await enumerator.MoveNextAsync())
                {
                    yield return enumerator.Current;
                    n--;
                }
                else
                {
                    yield break;
                }
            }
        }
    }
    

    Note that actual behavior will depend on the async enumerable itself, i.e. for example if it is loading data in batches then it will need to load a whole batch first.