It seems based on documentation that Scheduler.trampoline assures that elements emit first-in first-out (ie in order). it also seems that the point of concat map is to assure that everything is lined up appropriately and then emitted. So I was wondering if there was every a point in applying subscribeOn./.observeOn(Scheduler.trampoline()) and then doing afterwards concatmap operator as opposed to a regular mapping operating.
Yes, there's a point. Take this example:
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.trampoline())
.flatMap(
a -> {
if (a < 3) {
return Observable.just(a).delay(3, TimeUnit.SECONDS);
} else {
return Observable.just(a);
}
})
.doOnNext(
a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
.subscribe();
Here's the output:
Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
What's happening here is 1 and 2 reach the flatMap
operator in sequence. But now, the inner streams for these elements is delayed by 3 seconds. Note that flatMap
eagerly subscribes
to the inner streams. That is, it does not wait for one stream to finish (with onComplete
) before subscribing
to the next inner stream (which is what concatMap does).
So the inner streams of 1 and 2 are delayed by 3 seconds. You can say that this is an external I/O call that is taking a bit of time. Meanwhile, the next 3 elements (3,4,5) enter the flatMap
and their streams finish immediately. That's why you see the sequence maintained in the output.
Then 3 seconds get over and elements 1 and 2 are emitted. Note that there's no guarantee that 1 would come before 2.
Now replace flatMap
with concatMap
and you'll see that the sequence is maintained:
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
Element: 3, on: RxComputationScheduler-2
Element: 4, on: RxComputationScheduler-2
Element: 5, on: RxComputationScheduler-2
Why? Because that's how concatMap
works. Element 1 comes, and is used in an I/O call. It'll take 3 seconds before the inner stream corresponding to its inner stream emits an onComplete
. The inner stream corresponding the remaining elements are not subscribed to by concatMap
until the first stream emits an onComplete. As soon as it does, the next stream (Observable.just(2).delay(3, TimeUnit.SECONDS)
) is subscribed
to, and so on. So you can see how the order is maintained.
The thing you need to remember about these two operators is: flatMap
eagerly subscribes
to the inner streams, as and when the elements arrive. On the other hand, concatMap
waits for one stream to finish before it subscribes
to the next one. That's why you can't make parallel calls with concatMap
.