Search code examples
rx-javaobservablereactivexrx-kotlin

Flatten Observable of Observables


What I would like to do is create a function which runs another function every second. The second function returns Observables<A> and I want the first function to return Observables<A> as well instead of Observable<Observable<A>>

for example:

private A calcA(){
   ...
   return new A(...)
}

public Observable<A> getAs(){
   return Observable.create( subscriber -> {
      Bool condition = ...
      do {
         subscriber.onNext(calcA())
      } while (condition)
      subscriber.onComplete()
   })
}

public Observable<A> pollAs(){
   return Observable.create(subscriber -> {
      do {
         subscriber.onNext(getAs()) // Flatten here I guess
         Thread.sleep(1000)
      } while(true)
   })

So I would like to do something similar (I tried to write this in a Java-ish way, but I will use Kotlin)


Solution

  • You don't need to use the flatMap() operator to flatten the inner observable, since you only want to repeatedly subscribe to the same observable.

    public Observable<A> getAs() {
       return Observable.fromCallable( () -> calcA() )
                .repeat()
                .takeWhile( v -> !condition( v );
    }
    

    getAs() will emit items until the condition has been reached. It will then complete.

    public Observable<A> pollAs(){
       return getAs()
                .repeatWhen( completed -> completed.delay(1000, TimeUnit.MILLISECONDS) );
    

    pollAs() will continually resubscribe to the getAs() observable, pausing for a second between each subscription.

    Edit: I have uploaded a 6-month-duration example to https://pastebin.com/kSmi24GF It shows that you have to keep advancing the time for data to come out.