Search code examples
reactive-programmingrx-java2switchmap

Why onComplete not fired when an infinite stream is switchMap'ed


Using RxJava 2.2.8:

Observable.fromCallable(() -> "Some data")
            .subscribe(
                    s -> System.out.println(s),
                    e -> System.err.println(e),
                    () -> System.out.println("Completed")
            );

Output

Some data
Completed

My question is why onComplete never gets called for the following?

        Observable.interval(1, TimeUnit.SECONDS)
            .switchMap(t -> Observable.fromCallable(() -> "Some data"))
            .subscribe(
                    s -> System.out.println(s),
                    e -> System.err.println(e),
                    () -> System.out.println("Completed")
            );

Output

Some data
Some data
Some data
...

I understand Observable.interval will create a never ending stream, so no onComplete. My understanding of switchMap is that it returns an observable which fires events produced by the inner observable (cancelling any pending and flattening), in this case Observable.fromCallable.

Now, this 'inner' observable does have a definite end (unlike the outer observable), so why doesn't onComplete gets called on this inner Observable?

Why isn't the output like this?

Some data
Completed
Some data
Completed
Some data
Completed
...

Solution

  • From documentation:

    The resulting ObservableSource completes if both the upstream ObservableSource and the last inner ObservableSource

    Since upstream ObservableSource is an infinite stream, the resulting Observable will not complete.

    Also note that according to the observable contract, onComplete indicates the observable has terminated and it will not emit any further items in the future, so you will never see "Completed" followed by some other items regardless of your implementation.