Search code examples
androidretrofit2rx-javarx-android

How to make multiple request and responses from all the requests in Retrofit, RxJava, Android


I have a scenario where I want to call same API for multiple devices and display result after completing all requests. I am using retrofit 2. I know little bit about RxJava. I thought zip operator will be suitable for this. So implemented as below.

API in ApiInterface :

@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEPARATOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token, @Query("device_id") String deviceId, @Body JsonObject body);

Here is a method which calls API. It gets device id and its body in Map. This method calls API for every device id available in Map.

public void updateAllInfo(final HashMap<String, String> deviceIdMap, final ApiResponseListener listener) {

    List<Observable<ResponseBody>> requests = new ArrayList<>();
    ArrayList<String> reqIdList = new ArrayList<>();

    for (Map.Entry<String, String> entry : map.entrySet()) {

        String deviceId = entry.getKey();
        String jsonBodyStr = entry.getValue();
        Gson gson = new Gson();
        JsonObject jsonBody = gson.fromJson(jsonBodyStr, JsonObject.class);

        reqIdList.add(deviceId);
        requests.add(apiInterface.updateSchedules("accessToken", deviceId, jsonBody));
    }

    Observable.zip(requests, new Function<Object[], List<ResponseBody>>() {

        @Override
        public List<ResponseBody> apply(Object[] objects) throws Exception {

            Log.e("onSubscribe", "apply : " + objects.length);
            List<ResponseBody> dataResponses = new ArrayList<>();
            for (Object o : objects) {
                dataResponses.add((ResponseBody) o);
            }
            return dataResponses;
        }
    })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<List<ResponseBody>>() {

                @Override
                public void accept(List<ResponseBody> responseBodies) throws Exception {
                    Log.e("onSubscribe", "YOUR DATA IS HERE: " + responseBodies.size());
                    for (int i = 0; i < responseBodies.size(); i++) {
                        Log.e(TAG, "Response received for " + i + " is : " + responseBodies.get(i).string());
                    }
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.e("onSubscribe", "Throwable: " + throwable);
                }
            });
}

I want to get the response (success / failure) for every device id. Means I need response and also id for which API is called. Using zip operator, if any API is failed, failure is received in accept(Throwable throwable) method. If any API is failed, I think zip operator is not calling next API.

How can I get response (success or failure) for all request ?

Also need something to indicate the response is for which req / device id (Some mapping) to display result.

Is there any other operator I can use instead of zip ?

Any suggestion / help ?


Solution

  • I am a bit rusty in java, so I will write my answer in Kotlin, it should not be a problem for you to convert it yourself.

    Create a helper class that will include the ResponseBody alongside with the deviceId:

    data class IdentifiedResponseBody(
        val deviceId: String, 
        val responseBody: ResponseBody?
    )
    

    Then:

    // change the signature of your requests list to return IdentifiedResponseBody observables
    val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
    ...
    // your stated API have updateInfo instead of updateSchedules, but I will assume they have the same signature
    requests.add(
        apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
            .map { responseBody ->
                // map the added observable to return IdentifiedResponseBody
                IdentifiedResponseBody(deviceId, responseBody)
            }
            .onErrorReturn { error -> 
                // return an item here instead of throwing error, so that the other observables will still execute
                IdentifiedResponseBody(deviceId, null)
            }
    )
    

    Finally, use merge instead of zip:

    Observable.merge(requests)
            .subscribeOn(Schedulers.io())
            // it's a question if you want to observe these on main thread, depends on context of your application
            .subscribe(
                { identifiedResponse ->
                    // here you get both the deviceId and the responseBody
                    Log.d("RESPNOSE", "deviceId=${identifiedResponse.deviceId}, body=${identifiedResponse.responseBody}")
                    if (responseBody == null || responseBody.hasError()) {
                        // request for this deviceId failed, handle it
                    }
                },
                { error ->
                    Log.e("onSubscribe", "Throwable: " + error)
                }
            )
    

    See merge: http://reactivex.io/documentation/operators/merge.html

    See zip: http://reactivex.io/documentation/operators/zip.html

    You should see the profound difference: zip combines your responses to one single item defined by your mapping function (i.e. list of responses in your case), while merge emits all responses individually, at the time they are returned. In case of zip here, the combined result is returned at the moment (and only) when all the requests have finished; you may not want this behavior, as if a single request would not return a response, you would not get any response at all.

    UPDATE

    The java equivalent should be as follow, but revise before trying out, as I am not sure if I converted everything correctly:

    requests.add(
        apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
                    .map(new Function<ResponseBody, IdentifiedResponseBody>() {
                        @Override
                        public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
                            return new IdentifiedResponseBody(deviceId, responseBody);
                        }
                    })
                    .onErrorReturn(new Function<Throwable, IdentifiedResponseBody>() {
                        @Override
                        public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
                            return new IdentifiedResponseBody(deviceId, null);
                        }
                    })
            );
    
    Observable.merge(requests)
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<IdentifiedResponseBody>() {
                               @Override
                               public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
                                   // same logic as from kotlin part
                               }
                           },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                Log.e("onSubscribe", "Throwable: " + throwable);
                            }
                        });
    

    UPDATE 2

    In the comment you asked:

    Is there any way from which I can get final callback for all requests completed

    That's the problem with using Observable instead of Single/Completable, it just does not finish unless you explicitly close the channel or an error is thrown. In ideal context, Observable should be used for streams that continuously emits some data, for example an open channel to Room DB, as there are no telling how many time the DB will change. I admit that in your case it seems to be difficult to apply something else than Observable. There is however a workaround:

    Observable.merge(requests)
        // emits only this much items, then close this channel
        .take(requests.size.toLong())
        // executed when the channel is closed or disposed
        .doFinally {
             // todo: finalCallback
         }
        .subscribeOn(Schedulers.io())
        .subscribe(...)
    

    The code is again in Kotlin, should not be hard to transform to java. Please check what Observable.take() does: http://reactivex.io/documentation/operators/take.html