Search code examples
javajava-streamrx-javaoption-typejava-11

How to sum two Single<Optional<>> values in RxJava


How to summarize two Single<Optional<MonetaryAmount>> in Java 11?

I am using RxJava but I don't think it means anything.

For example:

Single<Optional<MonetaryAmount>> first = someMethod1(a, b);
Single<Optional<MonetaryAmount>> second = someMethod2(a, b);

I want to do something like this, not syntactically but logically:

Single<Optional<MonetaryAmount>> result = first + second;

I tried to do something like this but it doesn't work in Java 11

Single<Optional<MonetaryAmount>> result = Stream.concat(first, second)
                                                .reduce(MonetaryAmount::sum);

How can I do this?


Solution

  • Use concatMapSingle and mapOptional to get to the present values:

    Observable<Single<Optional<MonetaryAmount>>> amounts = 
        Observable.fromArray(first, second);
    
    amounts.concatMapSingle(v -> v)
    .mapOptional(v -> v) // RxJava 3
    .reduce(MonetaryAmount::sum)
    ;
    

    For RxJava 2, use a filter+map combo:

    .filter(Optional::isPresent)
    .map(Optional::get)