Search code examples
javarx-java2

How can I access an upstream object, later down stream in RxJava2


Let's say I have this Observable situation:

public void main() {
    Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
            .flatMap(id -> getEvenOdd(id))
            .map(string -> {
                // I now want to join string 
                // AND the last emitted ID integer
                return null;
            });
}
    
private Observable<String> getEvenOdd(Integer id) {
    if (id % 2 == 0) {
        return Observable.just("even");
    } else {
        return Observable.just("odd");
    }
}

The flatmap has transformed Integer into String. How can I now get access to the Integer inside map?

I know that I could add a doOnNext and cache the Integer:

private Integer intCache; 
    
public void main() {
    Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
            .doOnNext(integer -> intCache = integer)
            .flatMap(id -> getEvenOdd(id))
            .map(string -> {
                return intCache.toString() + " " + string;
            });

But this seems a little hacky and expands the scope of the Integer beyond my observable chain.


Solution

  • There is a specialized flatMap for this use-case:

    http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#flatMap(io.reactivex.functions.Function,%20io.reactivex.functions.BiFunction)

    The second parameter combines the results from the flatmap with the item that caused them to be emitted.

    So the updated example is

    public void main() {
        Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
                .flatMap(id -> getEvenOdd(id),
                        (BiFunction<Integer, String, String>) (integer, string) -> { // LOOK HERE
                            return string + Integer.toString(integer);
                        })
                .map(joinedStringAndInt -> {
                    ... use joinedStringAndInt
                });
    }
    

    Where the third type in the BiFunction is the type of the combined value. Here I just chose to combined the String and Integer into another String

    As a sidenote, the doOnNext solution is hacky and unsafe if you consider there can be multiple instances of the same Observable accessing the intCache variable.