Search code examples
rx-java2

Rxjava Scheduler.trampoline versus concatmap


It seems based on documentation that Scheduler.trampoline assures that elements emit first-in first-out (ie in order). it also seems that the point of concat map is to assure that everything is lined up appropriately and then emitted. So I was wondering if there was every a point in applying subscribeOn./.observeOn(Scheduler.trampoline()) and then doing afterwards concatmap operator as opposed to a regular mapping operating.


Solution

  • Yes, there's a point. Take this example:

    Observable.just(1, 2, 3, 4, 5)
        .subscribeOn(Schedulers.trampoline())
        .flatMap(
            a -> {
              if (a < 3) {
                return Observable.just(a).delay(3, TimeUnit.SECONDS);
              } else {
                return Observable.just(a);
              }
            })
        .doOnNext(
            a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
        .subscribe();
    

    Here's the output:

    Element: 3, on: main
    Element: 4, on: main
    Element: 5, on: main
    Element: 1, on: RxComputationScheduler-1
    Element: 2, on: RxComputationScheduler-2
    

    What's happening here is 1 and 2 reach the flatMap operator in sequence. But now, the inner streams for these elements is delayed by 3 seconds. Note that flatMap eagerly subscribes to the inner streams. That is, it does not wait for one stream to finish (with onComplete) before subscribing to the next inner stream (which is what concatMap does).

    So the inner streams of 1 and 2 are delayed by 3 seconds. You can say that this is an external I/O call that is taking a bit of time. Meanwhile, the next 3 elements (3,4,5) enter the flatMap and their streams finish immediately. That's why you see the sequence maintained in the output.

    Then 3 seconds get over and elements 1 and 2 are emitted. Note that there's no guarantee that 1 would come before 2.

    Now replace flatMap with concatMap and you'll see that the sequence is maintained:

    Element: 1, on: RxComputationScheduler-1
    Element: 2, on: RxComputationScheduler-2
    Element: 3, on: RxComputationScheduler-2
    Element: 4, on: RxComputationScheduler-2
    Element: 5, on: RxComputationScheduler-2
    

    Why? Because that's how concatMap works. Element 1 comes, and is used in an I/O call. It'll take 3 seconds before the inner stream corresponding to its inner stream emits an onComplete. The inner stream corresponding the remaining elements are not subscribed to by concatMap until the first stream emits an onComplete. As soon as it does, the next stream (Observable.just(2).delay(3, TimeUnit.SECONDS)) is subscribed to, and so on. So you can see how the order is maintained.

    The thing you need to remember about these two operators is: flatMap eagerly subscribes to the inner streams, as and when the elements arrive. On the other hand, concatMap waits for one stream to finish before it subscribes to the next one. That's why you can't make parallel calls with concatMap.