Search code examples
javaandroidrx-javarx-java2sqlbrite

Rxjava2 approach similar to backpressure for BiFunction


I used sqlbrite to listen the changes of table a and b. And use combineLatest operator to combine observables produced by sqlbrite. In the BiFunction process the emitted items of observableA and observableB.

private CompositeDisposable mSubscriptions = new CompositeDisposable();
private void initialize(){
    QueryObservable observableA = mDb.createQuery("table_a", "select * from table_a", null);
    QueryObservable observableB = mDb.createQuery("table_b", "select * from table_b", null);
    ResourceSubscriber subscriber = Flowable.combineLatest(
            RxJavaInterop.toV2Observable(observableA
                    .mapToList(mTableAMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
            , 
            RxJavaInterop.toV2Observable(observableB
                    .mapToList(mTableBMapperFunction)).toFlowable(BackpressureStrategy.LATEST)
            , new BiFunction<List<ItemA>, List<ItemB>, List<ResultItem>>() {
                @Override
                public List<ResultItem> apply(@io.reactivex.annotations.NonNull List<ItemA> aItems, @io.reactivex.annotations.NonNull List<ItemB> bItems) throws Exception {
                    List<ResultItem> resultItems = convertToResultItems(aItems, bItems);    // long process here, convert aItems and bItems to resultItems
                    return resultItems;
                }
            }
    )
            .onBackpressureLatest()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new ResourceSubscriber<List<ResultItem>>() {
                @Override
                public void onNext(List<ResultItem> resultItems) {
                    adapter.addData(resultItems);
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onComplete() {
                }
            });
    mSubscriptions.add(subscriber);
}

Question: If the BiFunction running too long (e.g. 10 seconds), that longer than the observables trigger interval(e.g. observables trigger every 1 second), that will lead the BiFunction do unnecessary works because of I only need the latest emitted item but the BiFunction is handle the emitted items one by one, so the BiFunction will handle the old emitted item, I don't need to handle it. I want the BiFunction to skip old emitted item and handle latest emitted item every completed apply() in BiFunction, to reduce waste of resources and save time. Is rxjava have approach similar to backpressure for BiFunction or other ways to solve this problem?

The figure show the current and expected BiFunction timeline. figure link

I found two methods to solve this problem, but there are flaw.

Method1: combine "aItems" and "bItems" by Pair then pass the reference to switchMap and process the job.

Flaw: switchMap only emit the latest item to subscriber but still do unnecessary works.

Method2: also combine "aItems" and "bItems" then pass the reference to onNext and process the job.

Flaw: blocked the UI thread.


Solution

  • You can just pass the pair of values in the combineLatest's combiner function along and use observeOn to place the computation off the original source threads:

     .combineLatest(srcA, srcB, (a, b) -> Pair.of(a, b))
     .onBackpressureLatest()
     .observeOn(Schedulers.computation(), false, 1)
     .map(pair -> compute(pair))
     .observeOn(AndroidSchedulers.mainThread())
     ...