I cannot wrap my head around the correct syntax to do the following:
A simplified example is below:
// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
while (true) {
Foo someData = listenToInputStream();
emitter.onNext(someData);
}
}
// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
.subscribeOn(Schedulers.io())
.publish();
cObs.connect();
Single<Foo> obs1 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
.filter(someFilter)
.singleOrError();
obs1.subscribe(someConsumer);
Single<Bar> obs2 = cObs
.subscribeOn(Schedulers.io())
.doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
.filter(someOtherFilter)
.map(fooToBar)
.singleOrError();
obs2.subscribe(someOtherConsumer);
I see that firstOrError() works but not singleOrError()/lastOrError()/.takeLast(1). Is there a way to get the latest that matches the filter criteria without blocking/hanging?
FWIW, if I do .take(1).singleOrError() it passes but I assume that's the same as firstOrError(). I'm looking for the most recent data emitted that matches that observers filter.
I also have other Observers that listen to any number/types of data emitted from the hot observable so that is why I am invoking doOnSubscribe for these particular observers instead of integrating the input stream directly into the observers themselves.
Following on Skynets idea of Subject, I think it kinda works if I use a PublishSubject as a middleman. Currently it's kinda working like a take() but I guess I could expand upon this to be more flexible and return 1 to N items.
Example:
PublishSubject<Foo> pSubj = PublishSubject.create();
cObjs
.filter(getCorrectData)
.doOnSubscribe(x -> triggerEventToBeSentToInputStream)
.subscribe(x -> {
pSubj.onNext(x);
pSubj.complete();
});
Single<Foo> obs1 = pSubj
.subscribeOn(Schedulers.io())
//etc