Search code examples
c#asynchronousqueueiasyncenumerable

Is there a C# class like Queue that implements IAsyncEnumerable?


Both Queue and ConcurrentQueue implement IEnumerable but not IAsyncEnumerable. Is there a standard class or class available on NuGet which implements IAsyncEnumerable such that, if the queue is empty, the result of MoveNextAsync does not complete until something next is added to the queue?


Solution

  • If you are using the .NET Core platform there are at least two built-in options:

    1. The System.Threading.Tasks.Dataflow.BufferBlock<T> class, part of the TPL Dataflow library. It doesn't implement the IAsyncEnumerable<T> natively¹, but it exposes the awaitable OutputAvailableAsync() method, doing it trivial to implement a ToAsyncEnumerable extension method.

    2. The System.Threading.Channels.Channel<T> class, the core component of the Channels library. It exposes an IAsyncEnumerable<T> implementation via its Reader.ReadAllAsync()² method.

    Both classes are also available for .NET Framework, by installing a nuget package (different for each one).

    An implementation of IAsyncEnumerable<T> for BufferBlock<T>:

    public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
        this IReceivableSourceBlock<T> source,
        [EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
        {
            while (source.TryReceive(out T item))
            {
                yield return item;
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
        await source.Completion.ConfigureAwait(false); // Propagate possible exception
    }
    

    ¹ Not available until .NET 5. Available from .NET 6 and later.
    ² Not available for .NET Framework, but easy to implement in a similar way.


    .NET 6 update: This functionality is now natively available, with the extension method ReceiveAllAsync, with this signature:

    public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
        this IReceivableSourceBlock<TOutput> source,
        CancellationToken cancellationToken = default);
    

    There are two differences though between the new API ReceiveAllAsync (source code), and the ToAsyncEnumerable shown above. The most important difference is that the exception of the source dataflow block is not propagated. So it's up to the consumer of the ReceiveAllAsync API to remember and await the Completion of the source after completing the enumeration, otherwise an exception will not be observed. A second difference is that the ReceiveAllAsync lacks the cancellationToken.ThrowIfCancellationRequested(); line, so a cancellation signal is not guaranteed to have immediate effect. This behavior is consistent with the behavior of the ChannelReader.ReadAllAsync method, which according to Microsoft is "by design".