reduce
operator emits value at the end of the observable (when completed).
I'm looking for a way to use reduce
inside a switchmap
. I want the sum
of the infinite internal observable values when outer observable emits values or complete.
@Test
public void emit_value_when_switchmap() throws InterruptedException {
Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
.switchMapMaybe(
l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.reduce(Long::sum)
.map(a -> a + ": Final")
)
.subscribe(e -> System.out.println(e));
Thread.sleep(10000);
}
This diagram illustrates the wanted behavior :
//events: --------x-----1----2---1---x-----3--0--------x-1---1----|
//result: ---------------------------4-----------------3----------2
This is probably not the best way to do it, but it does the job for now, until someone comes up with a fancier method to resolve your use-case.
Please have a look at my test, I think it resolves your question:
Environment: (gradle -- groovy)
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"
Test 3 emits are being made from the source observable. Each time a new value is emitted, the inner observable is subscribed to. When a new value is emitted, the inner observable completes and pushes the value downstream. Then the newly emitted value will be processed by subscribing to a new inner-stream.
@Test
public void takeWhileReduce() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> source = PublishSubject.create();
Observable<Long> publish = source.publish(
multicast -> {
return multicast.flatMap(
o -> {
return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
.takeUntil(multicast)
.reduce(Long::sum)
.toObservable();
},
1);
});
TestObserver<Long> test = publish.test();
source.onNext(42);
scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2,3
test.assertValuesOnly(6L);
// next value is flatMapped
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2
test.assertValuesOnly(6L, 3L);
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1
test.assertValuesOnly(6L, 3L, 1L);
}