Search code examples
c#.netsystem.reactiverx.net

Take the last item pushed to an Observable (Sequence)


I have an IObservable<Item> inside a class and I want to expose a read-only property that provides the last item pushed to the observable at a given time. So it will provide a single value of Item.

If no value has been pushed, then it will have to return a default value.

How can I do this without having to subscribe to the observable and having a "backing field"?


Solution

  • Here is another way to define the Value property, in the spirit of Asti's solution.

    private readonly IObservable<Item> _source;
    private readonly IObservable<Item> _lastValue;
    
    public SomeClass() // Constructor
    {
        _source = /* Initialize the source observable (hot) */
    
        _lastValue = _source
            .Catch(Observable.Never<Item>())
            .Concat(Observable.Never<Item>())
            .Publish(default)
            .AutoConnect(0)
            .FirstAsync();
    }
    
    public Item Value => _lastValue.Wait();
    

    The Publish operator that accepts an initialValue parameter...

    Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject<T>.

    The BehaviorSubject<T> is a specialized ISubject<T> that...

    Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

    The Catch and Concat operators have been added in order to preserve the last value, even in case that the source sequence completes normally or exceptionally.

    Personally I would be hesitant to use this solution, since a volatile field updated in the Do operator would accomplish the same thing more naturally. I am posting it mainly as a demonstration of the Rx capabilities.