Search code examples
androidrx-javaretrofitretrofit2reactivex

RxJava - Repeat API calls until all item returned


I have an API call that returns list of items by page. I use retrofit to implement and the interface is:

Observable<QueryResult> queryData(@Body QueryParams params);

The QueryParams and QueryResult is define as:

class QueryParams {
    int pageIndex, pageSize; // for pagination;
    ... // other query criteria
}

class QueryResult {
    int pageIndex, pageSize;
    int totalCount; // This is the total data size which is used to know if there are still data to retreat.
    ... // List of data returned by page;
}

And I use this code to get the first page of 100 data item:

params.pageIndex = 1;
params.pageSize = 100;
queryData(params).subscribe(...);

The API is designed as to get the data list page by page so I could efficiently response to the UI representation.

Somehow, in some cases, I need to get all the data at once and process with some tasks before representing to UI. With the interface designed like this, I have to call the queryData() several times till all the data fetched or at least twice (the first one to get the totalCount and pass it to pageSize for the second call).

So, my question is how do I do it with RxJava manners chaining API calls to get all the data?

Thanks in advance.

Update A solution from @Abu

Observable<QueryResult> query(final QueryParams params) {
    return queryData(params)
            .concatMap(new Func1<QueryResult, Observable<QueryResult>>() {
                @Override
                public Observable<QueryResult> call(final QueryResult result) {
                    int retrievedCount = result.getPageSize() * (result.getPageIndex() - 1) + result.resultList.size();
                    if (retrievedCount >= result.getCount()) {
                        return Observable.just(result);
                    }

                    QueryParams nextParams = params.clone();
                    nextParams.setPageIndex(results.getPageIndex() + 1);
                    return query(nextParams).map(new Func1<QueryResult, QueryResult>() {
                        @Override
                        public QueryResult call(QueryResult nextResult) {
                            nextResult.resultList.addAll(result.resultList);
                            return nextResult;
                        }
                    });
                }
}

Solution

  • One may be to do it recursively with concatMap and concatWith operator.

    Here is a sample code.

        private Observable<List<Integer>> getResponse(final int index) {
    
    
    
        return getData(index)
                .concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
                    @Override
                    public ObservableSource<? extends List<Integer>> apply(List<Integer> integers) throws Exception {
    
                        if (index == 10) {
                            return Observable.just(integers);
                        }else {
                            return Observable.just(integers)
                                    .concatWith(getResponse(index + 1));
                        }
                    }
                });
       }
    
    
       private Observable<List<Integer>> getData(int index){
    
          List<Integer> dataList = new ArrayList<>();
    
          for (int i = 0; i < 10; i++) {
              dataList.add(index*10 + i);
          }
    
          return Observable.just(dataList);
    
       }
    

    Usage:

            getResponse(1)
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, throwable.getMessage());
                    }
                });
    

    This will give you all data recursively in a order. You will get data for first index 1 , them index 2 , .......

    If there is a better solution i am waiting to see it.

    Edit:

    To get complete list of data use can update you code this way:

        private Observable<List<Integer>> getResponse(final int index) {
    
        return getData(index)
                .concatMap(new Function<List<Integer>, ObservableSource<? extends List<Integer>>>() {
                    @Override
                    public ObservableSource<? extends List<Integer>> apply(final List<Integer> integerList) throws Exception {
    
                        if (index < 9){
                            return getResponse(index+1)
                                    .map(new Function<List<Integer>, List<Integer>>() {
                                        @Override
                                        public List<Integer> apply(List<Integer> integers) throws Exception {
                                            integers.addAll(integerList);
                                            return integers;
                                        }
                                    });
                        }else {
                            return Observable.just(integerList);
                        }
    
                    }
                });
    }
    
    
    private Observable<List<Integer>> getData(int index){
    
        Util.printThreadInfo(index);
    
        final List<Integer> dataList = new ArrayList<>();
    
        for (int i = 0; i < 10; i++) {
            dataList.add(index*10 + i);
        }
    
        return Observable.just(dataList);
    
    }
    

    Usage:

            Observable.defer(new Callable<ObservableSource<? extends List<Integer>>>() {
            @Override
            public ObservableSource<? extends List<Integer>> call() throws Exception {
                return getResponse(1);
            }
        }).subscribeOn(Schedulers.io())
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        Collections.sort(integers);
                        Log.i(TAG, "Data: " + Arrays.toString(integers.toArray()));
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, throwable.getMessage());
                    }
                });
    

    This will give you complete data at once.

    I think you shouldn't get all data this way because if your page size is 100 you are crating 100 network call. You api should give you all data for a single call.

    I just update my answer to show how this can be done.