Search code examples
retrofitrx-javarx-androidrx-java3

Multiple chained API calls to fetch data, but doOnNext of PublishSubject is never reached


I have a problem to understand a chained "RXJava-Retrofit" API call. I got inspired by this and implement this class named ObservationLoader to load the data from the API bucket per bucket. When the end of data is reached the API sends a endOfRecords=true:

public Observable<PageObject<Observation>> getAllObservationDataByRegion(long taxonKey,
                                                                         String regionId) {
    final PublishSubject<PageObject<Observation>> subject = PublishSubject.create();
    return subject.doOnSubscribe(disposable -> {
                this.getData(taxonKey, regionId, 0).subscribe(subject);
            })
            .doOnNext(observationPageObject -> {
                if (observationPageObject.isEndOfRecords()) {
                    // -> list is completely loaded
                    subject.onComplete();
                } else {
                    int nextOffset = observationPageObject.getOffset() + 1;
                    this.getData(taxonKey, regionId, null, nextOffset).subscribe(subject);
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

private Observable<PageObject<Observation>> getData(long id,
                                                    String regionId,
                                                    int offset) {
    // Get your API response value
    return this.api.getObservations(id, regionId, ObservationLoader.PAGE_LIMIT, offset);
}

In my Android fragment HomeFragment I subscribe to the ObservationLoader:

ObservationLoader loader = new ObservationLoader(this.getApi());
Observable<PageObject<Observation>> observable = loader
    .getAllObservationDataByRegion(this.getSelectedSpecies(), this.getSelectedRegion());
observable.subscribe(new Observer<PageObject<Observation>>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "ON_SUBSCRIBE");
    }

    @Override
    public void onNext(PageObject<Observation> observationPageObject) {
        Log.i(TAG, "ON_NEXT");
        
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "ERROR = " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "COMPLETED");
    }
});

I can see that the onSubscribe() and doOnSubscribe() are called and even the getData() is reached. I assume the API is responding correctly (a previous attempt attempt with recursion worked fine). But I never reached the doOnNext function. The observer goes straight to onComplete() and no data is received. What could be the reason?


Solution

  • When doOnSubscribe runs, the doesn't see any consumers yet so if getData is synchronous, there won't be any first results to trigger further results. Also if getData ends, it will complete the setup so the next getData call in doOnNext will push to an already terminated subject, ingoring all data.

    You'll need a differently organized feedback loop:

    // we loop back the nextOffset, in a thread-safe manner
    Subject<Integer> subject = PublishSubject.<Integer>create()
                                             .toSerialized();
    
    // bootstrap with 0 and keep open for more offsets
    subject.mergeWith(Observable.just(0))
    // get the data for the current offset
    .concatMap(nextOffset -> getData(taxonKey, regionId, nextOffset)
                             .subscribeOn(Schedulers.io())
    )
    // if the response is end of records, stop
    .takeWhile(observationPageObject -> !observationPageObject.isEndOfRecords())
    // otherwise not end of records, feedback the new offset
    .doOnNext(observationPageObject -> 
                             subject.onNext(observationPageObject.getOffset() + 1)
    )
    // get the data on the main thread
    .observeOn(AndroidSchedulers.mainThread());