Search code examples
c#.netsystem.reactiverx.net

How to create an Observable that caches each of the calculated items? (equivalence of Lazy<T>)


I would like to create a sequence (Observable<T>) that is able to cache the items, so the calculations inside the pipeline are processed only once. For instance:

var obs = Observable
.Range(1, 100)
.SelectMany(x => GetItemAsync(x));

I would like that having 2 subscribers, the results of the GetItemAsync are cached so the 2nd subscriber will get them from the cached values, so the method shouldn't be called at all for any subsequent subscription.

I would like to do something similar to what Lazy<T> does, but with Reactive Extensions


Solution

  • The Replay operator returns an IConnectableObservable<T>, which is an IObservable<T> with an extra Connect method. This observable can be subscribed by any number of observers. It propagates to all observers all past and future notifications coming from the underlying observable, starting from the time it was connected and ending to the time it was disconnected. Here is an example:

    var connectable = Observable
        .Range(1, 100)
        .SelectMany(x => GetItemAsync(x))
        .Replay();
    
    var subscription1 = connectable.Subscribe(x => Console.WriteLine(x))
    var subscription2 = connectable.Subscribe(x => Console.WriteLine(x))
    
    var connection = connectable.Connect(); // Subscribe to the 'SelectMany' observable
    
    //...
    
    connection.Dispose() // Unsubscribe from the 'SelectMany' observable
    

    This example demonstrates how to connect manually to the underlying observable, which is important when using other multicast operators like the Publish. But it's less important with the Replay operator because of its replay functionality: it doesn't matter whether it will be subscribed before or after it connects to the underlying observable. So you may choose to avoid connecting manually, and use one of the two available auto-connect operators:

    1. RefCount: connects to the underlying observable when it is subscribed for the first time, and disconnects when its last subscriber unsubscribes.

    2. AutoConnect(0): connects to the underlying observable immediately, and stays connected forever.

    Example:

    var observable = Observable
        .Range(1, 100)
        .SelectMany(x => GetItemAsync(x))
        .Replay()
        .AutoConnect(0);
    
    // call observable.Subscribe any time and as many times you want
    

    Both RefCount and AutoConnect also disconnect automatically when the underlying observable completes successfully or with an error.

    Connecting and disconnecting more than once is not a supported scenario, and may produce unexpected results. If you want to disconnect and reconnect, you'd better use a different Replay connectable each time.