Search code examples
c#async-awaitsystem.reactive

How can I make async await the Reactive Extensions onNext, onError, onCompleted delegates


I have created the below extension where I was able to make an IObservable.Subscribe awaitable, but wrapping the whole subscription.

A big issue in current async programming in C# and the code below is the onNext, onError, onCompleted that are not awaitable.

Is there a way to work around and have async/await code? Any extension method that I missed;

public static Task Observe<T>(this IObservable<T> observable,
        Action<T> onNext,
        Func<Exception, EventWaitHandle, ExceptionDispatchInfo> onError,
        Action<EventWaitHandle, CancellationToken> onCompleted,
        CancellationToken cancellationToken
    ) where T : class
    {
        var compositeDisposables = new CompositeDisposable();
        var waitHandle = new ManualResetEvent(false);
        compositeDisposables.Add(waitHandle);

        var disposable = observable.Subscribe(
            // This should be await onNext
            onNext, 
            // This should be await onError
            e => onError(e, waitHandle),
            // This should be await onCompleted
            () => onCompleted(waitHandle, cancellationToken));
        
        compositeDisposables.Add(disposable);

        waitHandle.WaitOne();
        compositeDisposables.Dispose();

        return Task.CompletedTask;
    }

I understand there's a solution regarding onNext async but doesn't cover the onError and onCompleted.


Solution

  • You can already await an observable. Consider this code:

    async Task Main()
    {
        IObservable<int> observable = Observable.Range(0, 10);
        
        int value = await observable;
        
        Console.WriteLine(value);
    }
    

    That produces a value of 9 on the console. The last value of the observable.

    If you want the first one, just do this:

    async Task Main()
    {
        IObservable<int> observable = Observable.Range(0, 10);
        
        int value = await observable.Take(1);
        
        Console.WriteLine(value);
    }
    

    That produces 0.

    If you want to await all of the values then try this:

    async Task Main()
    {
        IObservable<int> observable = Observable.Range(0, 10);
        
        int[] values = await observable.ToArray();
        
        Console.WriteLine(String.Join(", ", values));
    }
    

    That produces 0, 1, 2, 3, 4, 5, 6, 7, 8, 9.

    Be mindful that if your observable doesn't produce a value then an exception is thrown. Or it doesn't end then this effectively is code that never is awaited.


    async Task Main()
    {
        IObservable<int> observable =
        from n in Observable.Range(0, 10)
        from p in Observable.FromAsync(() => OnNext(n))
        select n;
    
        int value = await observable;
    
        Console.WriteLine(value);
    }
    
    public async Task OnNext<T>(T value)
    {
        await Task.Delay(TimeSpan.FromSeconds(1.0));
    }