I used sqlbrite to listen the changes of table a and b. And use combineLatest operator to combine observables produced by sqlbrite. In the BiFunction process the emitted items of observableA and observableB.
private CompositeDisposable mSubscriptions = new CompositeDisposable();
private void initialize(){
QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null);
QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null);
ResourceSubscriber subscriber = Flowable.combineLatest(
RxJavaInterop.toV2Observable(observableA
.mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
,
RxJavaInterop.toV2Observable(observableB
.mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
, new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() {
@Override
public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception {
List<ResultItem> resultItems = convertToResultItems(aItems, bItems); // long process here, convert aItems and bItems to resultItems
return resultItems;
}
}
)
.onBackpressureLatest()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new ResourceSubscriber<List<ResultItem>>() {
@Override
public void onNext(List<ResultItem> resultItems) {
adapter.addData(resultItems);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
mSubscriptions.add(subscriber);
}
Question: If the BiFunction running too long (e.g. 10 seconds), that longer than the observables trigger interval(e.g. observables trigger every 1 second), that will lead the BiFunction do unnecessary works because of I only need the latest emitted item but the BiFunction is handle the emitted items one by one, so the BiFunction will handle the old emitted item, I don't need to handle it. I want the BiFunction to skip old emitted item and handle latest emitted item every completed apply() in BiFunction, to reduce waste of resources and save time. Is rxjava have approach similar to backpressure for BiFunction or other ways to solve this problem?
The figure show the current and expected BiFunction timeline. figure link
I found two methods to solve this problem, but there are flaw.
Method1: combine "aItems" and "bItems" by Pair then pass the reference to switchMap and process the job.
Flaw: switchMap only emit the latest item to subscriber but still do unnecessary works.
Method2: also combine "aItems" and "bItems" then pass the reference to onNext and process the job.
Flaw: blocked the UI thread.
You can just pass the pair of values in the combineLatest
's combiner function along and use observeOn
to place the computation off the original source threads:
.combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b))
.onBackpressureLatest()
.observeOn(Schedulers.computation(), false, 1)
.map(pair -> compute(pair))
.observeOn(AndroidSchedulers.mainThread())
...