Search code examples
javarx-java2reactivecombinelatest

RxJava: behaviour of combineLatest


I have the following code:

    final Observable<String> a = Observable.just("a1", "a2");   
    final Observable<String> b = Observable.just("b1");

    final Observable<String> c = Observable.combineLatest(a, b, (first, second) -> first + second);

    c.subscribe(res -> System.out.println(res));

What is expected output? I would have expected

a1b1
a2b1

But the actual output is

a2b1

Does that make sense? What is the correct operator the generate the expected sequence?


Solution

  • As the name of the operator should imply, it combines the latest value of each source. If the sources are synchronous or really fast, this could mean that one or more sources will run to their completion and the operator will remember only the very last values of each. You have to interleave the source values by some means, such as having asynchronous sources with ample amount of time between items and avoid close overlapping of items of multiple sources.

    The expected sequence can be generated a couple of ways, depending on what your original intention was. For example, if you wanted all cross combination, use flatMap:

    a.flatMap(aValue -> b, (aValue, bValue) -> first + second)
    .subscribe(System.out::println);
    

    If b is something expensive to recreate, cache it:

    Observable<String> cachedB = b.cache();
    a.flatMap(aValue -> cachedB, (aValue, bValue) -> first + second)
    .subscribe(System.out::println);