Search code examples
system.reactivereactive-programmingreactive-extensions-js

Reactive Extensions: How to create a placeholder observable?


I have a method, getObs(), that returns an observable, which should be shared by all callers. However, that observable might not exist when somebody calls getObs(), and creating it is an async operation, so my idea was to return a placeholder observable that is replaced with the real observable once it is created.

My basic attempt goes something like this:

var createSubject = new Rx.Subject();
var placeholder = createSubject.switchLatest();

Where I can return placeholder if the real observable does not exist when 'getObs()' is called. When the real observable is created, I use createSubject.onNext(realObservable), which then passes it to switchLatest() that unwraps it for any subscribers.

However, it does seem like overkill to use a Subject and switchLatest for this purpose, so I am wondering if there is a more direct solution?


Solution

  • If the act of getting the observable itself is asynchronous, you should model that also as an observable.

    For example...

    var getObsAsync = function () {
        return Rx.Observable.create(function (observer) {
            var token = startSomeAsyncAction(function (result) {
                    // the async action has completed!
                    var obs = Rx.Observable.fromArray(result.dataArray);
                    token = undefined;
                    observer.OnNext(obs);
                    observer.OnCompleted();
                }),
                unsubscribeAction = function () {
                    if (asyncAction) {
                        stopSomeAsyncAction(token);
                    }
                };            
    
            return unsubscribeAction;
        });
    };
    
    var getObs = function () { return getObsAsync().switchLatest(); };
    

    And if you want to share a single instance of that observable, but you do not wish to get the observable until someone actually subscribes, then you do:

    // source must be a Connectable Observable (ie the result of Publish or Replay)
    // will connect the observable the first time an observer subscribes
    // If an action is supplied, then it will call the action with a disposable
    // that can be used to disconnect the observable.
    // idea taken from Rxx project
    Rx.Observable.prototype.prime = function (action) {
        var source = this;
        if (!(source instanceof Rx.Observable) || !source.connect) {
            throw new Error("source must be a connectable observable");
        }
    
        var connection = undefined;
        return Rx.Observable.createWithDisposable(function (observer) {
            var subscription = source.subscribe(observer);
    
            if (!connection) {
                // this is the first observer.  Connect the underlying observable.
                connection = source.connect();
                if (action) {
                    // Call action with a disposable that will disconnect and reset our state
                    var disconnect = function() {
                        connection.dispose();
                        connection = undefined;
                    };
                    action(Rx.Disposable.create(disconnect));
                }
            }
    
            return subscription;
        });
    };
    
    var globalObs = Rx.Observable.defer(getObs).publish().prime();
    

    Now code wherever can just use globalObs and not worry about it:

    // location 1
    globalObs.subscribe(...);
    
    // location 2
    globalObs.select(...)...subscribe(...);
    

    Notice, that no one actually even needs to call getObs because you just setup a global observable that will (via defer) call getObs for you when someone subscribes.