I am looking for an operator that combines two Observables
by not emitting anything until both Observables
have emitted an element (similar to combineLatest
), but then only emits elements by combining elements from one Observable
with the most recently emitted element of the other Observable
(similar to withLatestFrom
). The results would look like this (y
observable is the "control"):
Does an operator like this exist?
I solved this in Java, but the same theory should work for you.
What you have is really two basic patterns; A combineLatest
value followed by withLatestFrom
values. If withLatestFrom
triggers first then you want to skip the combineLatest
value.
Start by making the withLatestFrom
observable:
Observable<Result> wlf = o1.withLatestFrom(o2, f::apply);
Next we want to create the combineLatest
observable that emits a single value. We also want to stop this observable when wlf
triggers:
Observable<Result> cl = Observable.combineLatest(o1, o2, f::apply)
.take(1).takeUntil(wlf);
Finally add these two observable together... For convenience I made a helper method to accept any two observables and a bi-function operator:
public static <Result,
Param1, Source1 extends Param1,
Param2, Source2 extends Param2>
Observable<Result> combineThenLatestFrom(
final Observable<Source1> o1,
final Observable<Source2> o2,
final BiFunction<Param1, Param2, Result> f
) {
final Observable<Result> base = o1
.withLatestFrom(o2, f::apply);
return Observable
.combineLatest(o1, o2, f::apply)
.take(1).takeUntil(base)
.mergeWith(base);
}
And here's the test code I used to verify the method:
public static void main(final String[] args) {
final TestScheduler scheduler = new TestScheduler();
final TestSubject<String> o1 = TestSubject.create(scheduler);
final TestSubject<String> o2 = TestSubject.create(scheduler);
final Observable<String> r = combineThenLatestFrom(o1, o2, (a, b) -> a + b);
r.subscribe(System.out::println);
o1.onNext("1");
o1.onNext("2");
o2.onNext("A");
o2.onNext("B");
o2.onNext("C");
o2.onNext("D");
o1.onNext("3");
o2.onNext("E");
scheduler.triggerActions();
}
Which outputs:
2A
3D