Both Queue
and ConcurrentQueue
implement IEnumerable
but not IAsyncEnumerable
. Is there a standard class or class available on NuGet which implements IAsyncEnumerable
such that, if the queue is empty, the result of MoveNextAsync
does not complete until something next is added to the queue?
If you are using the .NET Core platform there are at least two built-in options:
The System.Threading.Tasks.Dataflow.BufferBlock<T>
class, part of the TPL Dataflow library. It doesn't implement the IAsyncEnumerable<T>
natively¹, but it exposes the awaitable OutputAvailableAsync()
method, doing it trivial to implement a ToAsyncEnumerable
extension method.
The System.Threading.Channels.Channel<T>
class, the core component of the Channels library. It exposes an IAsyncEnumerable<T>
implementation via its
Reader.ReadAllAsync()
² method.
Both classes are also available for .NET Framework, by installing a nuget package (different for each one).
An implementation of IAsyncEnumerable<T>
for BufferBlock<T>
:
public static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
this IReceivableSourceBlock<T> source,
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
while (await source.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
{
while (source.TryReceive(out T item))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
await source.Completion.ConfigureAwait(false); // Propagate possible exception
}
¹ Not available until .NET 5. Available from .NET 6 and later.
² Not available for .NET Framework, but easy to implement in a similar way.
.NET 6 update: This functionality is now natively available, with the extension method ReceiveAllAsync
, with this signature:
public static IAsyncEnumerable<TOutput> ReceiveAllAsync<TOutput> (
this IReceivableSourceBlock<TOutput> source,
CancellationToken cancellationToken = default);
There are two differences though between the new API ReceiveAllAsync
(source code), and the ToAsyncEnumerable
shown above. The most important difference is that the exception of the source
dataflow block is not propagated. So it's up to the consumer of the ReceiveAllAsync
API to remember and await
the Completion
of the source
after completing the enumeration, otherwise an exception will not be observed. A second difference is that the ReceiveAllAsync
lacks the cancellationToken.ThrowIfCancellationRequested();
line, so a cancellation signal is not guaranteed to have immediate effect. This behavior is consistent with the behavior of the ChannelReader.ReadAllAsync
method, which according to Microsoft is "by design".