Search code examples
androidrx-javarx-java2rx-kotlinrx-kotlin2

RxJava: Combining hot and cold observable to wait for each other


I have my observables defined like this

    val initLoading = Observable.fromCallable { println("${System.currentTimeMillis()}") }
            .subscribeOn(Schedulers.computation())
            .delay(WAIT_TIME, TimeUnit.SECONDS)
            .map { "loading ${System.currentTimeMillis()}" }
            .observeOn(AndroidSchedulers.mainThread())

    val click = RxView.clicks(button).map { "click ${System.currentTimeMillis()}" }
    initLoading.concatWith(click)
            .subscribeBy(
                    onNext = { println("result $it") },
                    onError = { throw it }
            )

initialLoading starts running at Activity's onCreate method. click is executed on button click. I have two cases and first is working, second isn't.

case 1

activity starts and button is clicked after WAIT_TIME seconds. Output:

   01-23 13:08:07.170  I/System.out: 1516698487170
   01-23 13:08:17.174  I/System.out: result loading 1516698497172
   01-23 13:08:29.258  I/System.out: result click 1516698509258

case 2

activity starts and button is clicked before WAIT_TIME period is over. Output

   01-23 13:09:07.392 I/System.out: 1516698547392
   01-23 13:09:17.398 I/System.out: result loading 1516698557395

so, the problem is that the click event is lost. I want the click event to wait for the loading, and then continue working. in short, case 2 output should be the same as case 1.

How can i chive this using rx operators. I tried merge but it just combines both and click event doesn't wait for loading.

I also tried reply, cache, publish, share but couldn't get the right combination of them to work as I want.


Solution

  • The concatWith operator is good for your use case but the second observable should start to store click events immediately after it is created so that the stored events can be emitted when the observable is subscribed to (which happens when initLoading completes). This can be achieved by modifying your click observable with replay() and connect().

    val replayedClicks = click.replay();
    replayedClicks.connect(); // The original click observable is subscribed to at this point
    

    Now you can use replayedClicks in the concatWith, and its stored events will be replayed after initLoading finishes:

    initLoading.concatWith(replayedClicks)
            .subscribeBy(
                    onNext = { println("result $it") },
                    onError = { throw it }
            )