I have a method, getObs()
, that returns an observable, which should be shared by all callers. However, that observable might not exist when somebody calls getObs()
, and creating it is an async operation, so my idea was to return a placeholder observable that is replaced with the real observable once it is created.
My basic attempt goes something like this:
var createSubject = new Rx.Subject();
var placeholder = createSubject.switchLatest();
Where I can return placeholder
if the real observable does not exist when 'getObs()' is called. When the real observable is created, I use createSubject.onNext(realObservable)
, which then passes it to switchLatest()
that unwraps it for any subscribers.
However, it does seem like overkill to use a Subject and switchLatest for this purpose, so I am wondering if there is a more direct solution?
If the act of getting the observable itself is asynchronous, you should model that also as an observable.
For example...
var getObsAsync = function () {
return Rx.Observable.create(function (observer) {
var token = startSomeAsyncAction(function (result) {
// the async action has completed!
var obs = Rx.Observable.fromArray(result.dataArray);
token = undefined;
observer.OnNext(obs);
observer.OnCompleted();
}),
unsubscribeAction = function () {
if (asyncAction) {
stopSomeAsyncAction(token);
}
};
return unsubscribeAction;
});
};
var getObs = function () { return getObsAsync().switchLatest(); };
And if you want to share a single instance of that observable, but you do not wish to get the observable until someone actually subscribes, then you do:
// source must be a Connectable Observable (ie the result of Publish or Replay)
// will connect the observable the first time an observer subscribes
// If an action is supplied, then it will call the action with a disposable
// that can be used to disconnect the observable.
// idea taken from Rxx project
Rx.Observable.prototype.prime = function (action) {
var source = this;
if (!(source instanceof Rx.Observable) || !source.connect) {
throw new Error("source must be a connectable observable");
}
var connection = undefined;
return Rx.Observable.createWithDisposable(function (observer) {
var subscription = source.subscribe(observer);
if (!connection) {
// this is the first observer. Connect the underlying observable.
connection = source.connect();
if (action) {
// Call action with a disposable that will disconnect and reset our state
var disconnect = function() {
connection.dispose();
connection = undefined;
};
action(Rx.Disposable.create(disconnect));
}
}
return subscription;
});
};
var globalObs = Rx.Observable.defer(getObs).publish().prime();
Now code wherever can just use globalObs and not worry about it:
// location 1
globalObs.subscribe(...);
// location 2
globalObs.select(...)...subscribe(...);
Notice, that no one actually even needs to call getObs
because you just setup a global observable that will (via defer
) call getObs
for you when someone subscribes.