Search code examples
reactive-programmingrx-swiftreactivex

Rx operator that starts like combineLatest but then acts like withLatestFrom


I am looking for an operator that combines two Observables by not emitting anything until both Observables have emitted an element (similar to combineLatest), but then only emits elements by combining elements from one Observable with the most recently emitted element of the other Observable (similar to withLatestFrom). The results would look like this (y observable is the "control"):

enter image description here

Does an operator like this exist?


Solution

  • I solved this in Java, but the same theory should work for you.

    What you have is really two basic patterns; A combineLatest value followed by withLatestFrom values. If withLatestFrom triggers first then you want to skip the combineLatest value.

    Start by making the withLatestFrom observable:

    Observable<Result> wlf = o1.withLatestFrom(o2, f::apply);
    

    Next we want to create the combineLatest observable that emits a single value. We also want to stop this observable when wlf triggers:

    Observable<Result> cl = Observable.combineLatest(o1, o2, f::apply)
        .take(1).takeUntil(wlf);
    

    Finally add these two observable together... For convenience I made a helper method to accept any two observables and a bi-function operator:

    public static <Result,
        Param1, Source1 extends Param1,
        Param2, Source2 extends Param2>
    Observable<Result> combineThenLatestFrom(
        final Observable<Source1> o1,
        final Observable<Source2> o2,
        final BiFunction<Param1, Param2, Result> f
    ) {
        final Observable<Result> base = o1
            .withLatestFrom(o2, f::apply);
        return Observable
            .combineLatest(o1, o2, f::apply)
            .take(1).takeUntil(base)
            .mergeWith(base);
    }
    

    And here's the test code I used to verify the method:

    public static void main(final String[] args) {
        final TestScheduler scheduler = new TestScheduler();
        final TestSubject<String> o1 = TestSubject.create(scheduler);
        final TestSubject<String> o2 = TestSubject.create(scheduler);
        final Observable<String> r = combineThenLatestFrom(o1, o2, (a, b) -> a + b);
        r.subscribe(System.out::println);
        o1.onNext("1");
        o1.onNext("2");
        o2.onNext("A");
        o2.onNext("B");
        o2.onNext("C");
        o2.onNext("D");
        o1.onNext("3");
        o2.onNext("E");
        scheduler.triggerActions();
    }
    

    Which outputs:

    2A
    3D