Search code examples
rx-javarx-java2

Subscribing to Hot Observable w/o onComplete


I cannot wrap my head around the correct syntax to do the following:

  1. Create a Hot Observable that only executes once and never calls onComplete
  2. Have different Observables listen to it and respond to the data being emitted from the hot observable without forcing the hot observable to call onComplete.

A simplified example is below:

// Create an observable that never calls onComplete and keeps spitting out data
Observable<Foo> hotObservable = Observable.create(emitter -> {
   while (true) {
     Foo someData = listenToInputStream();
     emitter.onNext(someData);
   }
}

// Ensure observable is fired off immediately
ConnectableObservable cObs = hotObservable
  .subscribeOn(Schedulers.io())
  .publish();
cObs.connect();

Single<Foo> obs1 = cObs
 .subscribeOn(Schedulers.io())
 .doOnSubscribe(x -> triggerObs1EventToBeSentToInputStream)
 .filter(someFilter)
 .singleOrError();
obs1.subscribe(someConsumer);

Single<Bar> obs2 = cObs
 .subscribeOn(Schedulers.io())
 .doOnSubscribe(x -> triggerObs2EventToBeSentToInputStream)
 .filter(someOtherFilter)
 .map(fooToBar)
 .singleOrError();
obs2.subscribe(someOtherConsumer);

I see that firstOrError() works but not singleOrError()/lastOrError()/.takeLast(1). Is there a way to get the latest that matches the filter criteria without blocking/hanging?

FWIW, if I do .take(1).singleOrError() it passes but I assume that's the same as firstOrError(). I'm looking for the most recent data emitted that matches that observers filter.

I also have other Observers that listen to any number/types of data emitted from the hot observable so that is why I am invoking doOnSubscribe for these particular observers instead of integrating the input stream directly into the observers themselves.


Solution

  • Following on Skynets idea of Subject, I think it kinda works if I use a PublishSubject as a middleman. Currently it's kinda working like a take() but I guess I could expand upon this to be more flexible and return 1 to N items.

    Example:

    PublishSubject<Foo> pSubj = PublishSubject.create();
    cObjs
    .filter(getCorrectData)
    .doOnSubscribe(x -> triggerEventToBeSentToInputStream)
    .subscribe(x -> {
      pSubj.onNext(x);
      pSubj.complete();
    });
    
    Single<Foo> obs1 = pSubj
     .subscribeOn(Schedulers.io())
     //etc