Search code examples
c#system.reactiverx.net

Why Observable.ToEnumerable() does not produce values until underlying sequence completes?


Running the following code

    foreach(var i in
    Observable
        .Range(1, 3)
        .Do(Console.WriteLine)
        .ToEnumerable())
        Console.WriteLine("Fin:" + i);

I'm getting this output:

1
2
3
Fin:1
Fin:2
Fin:3

The question is - why does ToEnumerable caches all the values and provides them just after the source sequence completes? Does it related somehow to "leaving the monad"?


Solution

  • If you dig into the source code of the Rx library you'll see that the ToEnumerable operator is implemented basically like this:

    public static IEnumerable<T> ToEnumerable<T>(this IObservable<T> source)
    {
        using var enumerator = new GetEnumerator<T>();
        enumerator.Run(source);
        while (enumerator.MoveNext()) yield return enumerator.Current;
    }
    

    ...where the GetEnumerator<T> is a class defined in this file. This class is an IEnumerator<T> and an IObserver<T>. It has an internal _queue (ConcurrentQueue<T>) where it stores the received items. The most interesting methods are the Run, OnNext and MoveNext:

    public IEnumerator<T> Run(IObservable<T> source)
    {
        _subscription.Disposable = source.Subscribe(this);
        return this;
    }
    
    public void OnNext(T value)
    {
        _queue.Enqueue(value);
        _gate.Release();
    }
    
    public bool MoveNext()
    {
        _gate.Wait();
        if (_queue.TryDequeue(out _current)) return true;
        _error?.Throw();
        return false;
    }
    

    In your code, when you start the foreach loop, the Run method runs, and the Range+Do sequence is subscribed. This sequence emits all its elements during the subscription. The OnNext method is invoked for each emited element, so all elements are enqueued inside the _queue. After the completion of the Run method, follows the while loop that dequeues and yields the queued elements. That's why you see all the sideffects of the Do operator happening before any iteration of your foreach loop.

    The Rx library includes another operator similar to the ToEnumerable, the Next operator, with this signature:

    // Returns an enumerable sequence whose enumeration blocks until the next element
    // in the source observable sequence becomes available. Enumerators on the resulting
    // sequence will block until the next element becomes available.
    public static IEnumerable<T> Next<T>(this IObservable<T> source);
    

    According to my expirements this operator doesn't do what you want either.