Search code examples
rx-javarx-java2

Rxjava : Emit reduce value when switchmap happens


reduce operator emits value at the end of the observable (when completed).

I'm looking for a way to use reduce inside a switchmap. I want the sum of the infinite internal observable values when outer observable emits values or complete.

@Test
public void emit_value_when_switchmap() throws InterruptedException {

    Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
            .switchMapMaybe(
                    l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                            .reduce(Long::sum)
                            .map(a -> a + ": Final")
            )
            .subscribe(e -> System.out.println(e));


    Thread.sleep(10000);
}

This diagram illustrates the wanted behavior :

//events: --------x-----1----2---1---x-----3--0--------x-1---1----|  
//result: ---------------------------4-----------------3----------2  

Solution

  • This is probably not the best way to do it, but it does the job for now, until someone comes up with a fancier method to resolve your use-case.

    Please have a look at my test, I think it resolves your question:

    Environment: (gradle -- groovy)

    implementation "io.reactivex.rxjava2:rxjava:2.2.8"
    testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
    testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
    testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"
    

    Test 3 emits are being made from the source observable. Each time a new value is emitted, the inner observable is subscribed to. When a new value is emitted, the inner observable completes and pushes the value downstream. Then the newly emitted value will be processed by subscribing to a new inner-stream.

      @Test
      public void takeWhileReduce() {
        TestScheduler scheduler = new TestScheduler();
        PublishSubject<Integer> source = PublishSubject.create();
    
        Observable<Long> publish = source.publish(
            multicast -> {
              return multicast.flatMap(
                  o -> {
                    return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
                        .takeUntil(multicast)
                        .reduce(Long::sum)
                        .toObservable();
                  },
                  1);
            });
    
        TestObserver<Long> test = publish.test();
    
        source.onNext(42);
    
        scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);
    
        // action - push next value - flatMapped value will complete and push value
        source.onNext(42);
        // assert - values emitted: 0,1,2,3
        test.assertValuesOnly(6L);
    
        // next value is flatMapped
        scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
        // action - push next value - flatMapped value will complete and push value
        source.onNext(42);
    
        // assert - values emitted: 0,1,2
        test.assertValuesOnly(6L, 3L);
    
        scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
    
        // action - push next value - flatMapped value will complete and push value
        source.onNext(42);
    
        // assert - values emitted: 0,1
        test.assertValuesOnly(6L, 3L, 1L);
      }