Search code examples
c#asynchronous.net-coreiasyncenumerablesystem.threading.channels

System.Threading.Channels ReadAsync() method is blocking execution


Overview

I am attempting to write an IAsyncEnumerable<T> wrapper around an IObserver<T> interface. At first I used a BufferBlock<T> as the backing data store, but I found out through performance testing and research that it is actually a pretty slow type, so I decided to give the System.Threading.Channels.Channel type a go. I had a similar problem with my BufferBlock implementation as this one but this time I'm not sure how to resolve it.

Problem

My GetAsyncEnumerator() loop gets blocked by the await _channel.Reader.WaitToRead(token) call if my IObserver<T>.OnNext() method hasn't written to the _channel yet. What is the correct way to wait for a value to be available to yield in this context without blocking program execution?

Implementation

public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>,
    IObserver<T>, IDisposable
{
    private readonly IDisposable _unsubscriber;
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    private bool _producerComplete;

    public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public async void OnNext(T value)
    {
        Log.Logger.Verbose("Adding value to Channel.");
        await _channel.Writer.WriteAsync(value);
    }

    public void OnError(Exception error)
    {
        _channel.Writer.Complete(error);
    }

    public void OnCompleted()
    {
        _producerComplete = true;
    }

    public async IAsyncEnumerator<T> GetAsyncEnumerator(
        [EnumeratorCancellation] CancellationToken token = new CancellationToken())
    {
        Log.Logger.Verbose("Starting async iteration...");
        while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
        {
            Log.Logger.Verbose("Reading...");
            while (_channel.Reader.TryRead(out var item))
            {
                Log.Logger.Verbose("Yielding item.");
                yield return item;
            }
            Log.Logger.Verbose("Awaiting more items.");
        }
        Log.Logger.Verbose("Iteration Complete.");
        _channel.Writer.Complete();
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
        _unsubscriber?.Dispose();
    }
}

Screenshot

Additional Context

It shouldn't matter, but at runtime the IObservable<T> instance passed into the constructor is a CimAsyncResult returned from async calls made to the Microsoft.Management.Infrastructure apis. Those make use of the Observer design pattern which I'm trying to wrap with the fancy new async enumeration pattern.

Edit

Updated with logging to the debugger output and made my OnNext() method async/await as one commenter suggested. You can see it never enters the while() loop.


Solution

  • Further up the call stack I was calling the async method syncronously via the GetAwaiter().GetResult() methods.

    Yup, that's a problem.

    I did this because in once case I wanted to get the data from within a constructor. I changed that implementation to execute the call using Task.Run() and now the iterators run flawlessly with both implementations.

    There are better solutions than blocking on asynchronous code. Using Task.Run is one way to avoid the deadlock, but you still end up with a sub-par user experience (I'm assuming yours is a UI application, since there is a SynchronizationContext).

    If the asynchronous enumerator is used to load data for display, then a more proper solution is to (synchronously) initialize the UI to a "Loading..." state, and then update that state as the data is loaded asynchronously. If the asynchronous enumerator is used for something else, you may find some appropriate alternative patterns in my async constructors blog post.