I have a scenario where I want to call same API for multiple devices and display result after completing all requests. I am using retrofit 2. I know little bit about RxJava. I thought zip operator will be suitable for this. So implemented as below.
API in ApiInterface
:
@PUT(AppConstants.BASE_URL + AppConstants.PATH_SEPARATOR + "/user/endpoint")
Observable<ResponseBody> updateInfo(@Header("Authorization") String token, @Query("device_id") String deviceId, @Body JsonObject body);
Here is a method which calls API. It gets device id and its body in Map. This method calls API for every device id available in Map.
public void updateAllInfo(final HashMap<String, String> deviceIdMap, final ApiResponseListener listener) {
List<Observable<ResponseBody>> requests = new ArrayList<>();
ArrayList<String> reqIdList = new ArrayList<>();
for (Map.Entry<String, String> entry : map.entrySet()) {
String deviceId = entry.getKey();
String jsonBodyStr = entry.getValue();
Gson gson = new Gson();
JsonObject jsonBody = gson.fromJson(jsonBodyStr, JsonObject.class);
reqIdList.add(deviceId);
requests.add(apiInterface.updateSchedules("accessToken", deviceId, jsonBody));
}
Observable.zip(requests, new Function<Object[], List<ResponseBody>>() {
@Override
public List<ResponseBody> apply(Object[] objects) throws Exception {
Log.e("onSubscribe", "apply : " + objects.length);
List<ResponseBody> dataResponses = new ArrayList<>();
for (Object o : objects) {
dataResponses.add((ResponseBody) o);
}
return dataResponses;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<List<ResponseBody>>() {
@Override
public void accept(List<ResponseBody> responseBodies) throws Exception {
Log.e("onSubscribe", "YOUR DATA IS HERE: " + responseBodies.size());
for (int i = 0; i < responseBodies.size(); i++) {
Log.e(TAG, "Response received for " + i + " is : " + responseBodies.get(i).string());
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe", "Throwable: " + throwable);
}
});
}
I want to get the response (success / failure) for every device id. Means I need response and also id for which API is called.
Using zip operator, if any API is failed, failure is received in accept(Throwable throwable)
method. If any API is failed, I think zip operator is not calling next API.
How can I get response (success or failure) for all request ?
Also need something to indicate the response is for which req / device id (Some mapping) to display result.
Is there any other operator I can use instead of zip ?
Any suggestion / help ?
I am a bit rusty in java, so I will write my answer in Kotlin, it should not be a problem for you to convert it yourself.
Create a helper class that will include the ResponseBody
alongside with the deviceId
:
data class IdentifiedResponseBody(
val deviceId: String,
val responseBody: ResponseBody?
)
Then:
// change the signature of your requests list to return IdentifiedResponseBody observables
val requests = mutableListOf<Observable<IdentifiedResponseBody>>()
...
// your stated API have updateInfo instead of updateSchedules, but I will assume they have the same signature
requests.add(
apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
.map { responseBody ->
// map the added observable to return IdentifiedResponseBody
IdentifiedResponseBody(deviceId, responseBody)
}
.onErrorReturn { error ->
// return an item here instead of throwing error, so that the other observables will still execute
IdentifiedResponseBody(deviceId, null)
}
)
Finally, use merge
instead of zip
:
Observable.merge(requests)
.subscribeOn(Schedulers.io())
// it's a question if you want to observe these on main thread, depends on context of your application
.subscribe(
{ identifiedResponse ->
// here you get both the deviceId and the responseBody
Log.d("RESPNOSE", "deviceId=${identifiedResponse.deviceId}, body=${identifiedResponse.responseBody}")
if (responseBody == null || responseBody.hasError()) {
// request for this deviceId failed, handle it
}
},
{ error ->
Log.e("onSubscribe", "Throwable: " + error)
}
)
See merge
: http://reactivex.io/documentation/operators/merge.html
See zip
: http://reactivex.io/documentation/operators/zip.html
You should see the profound difference: zip
combines your responses to one single item defined by your mapping function (i.e. list of responses in your case), while merge
emits all responses individually, at the time they are returned. In case of zip
here, the combined result is returned at the moment (and only) when all the requests have finished; you may not want this behavior, as if a single request would not return a response, you would not get any response at all.
UPDATE
The java equivalent should be as follow, but revise before trying out, as I am not sure if I converted everything correctly:
requests.add(
apiInterface.updateSchedules("accessToken", deviceId, jsonBody)
.map(new Function<ResponseBody, IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(ResponseBody responseBody) throws Exception {
return new IdentifiedResponseBody(deviceId, responseBody);
}
})
.onErrorReturn(new Function<Throwable, IdentifiedResponseBody>() {
@Override
public IdentifiedResponseBody apply(Throwable throwable) throws Exception {
return new IdentifiedResponseBody(deviceId, null);
}
})
);
Observable.merge(requests)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<IdentifiedResponseBody>() {
@Override
public void accept(IdentifiedResponseBody identifiedResponseBody) throws Exception {
// same logic as from kotlin part
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("onSubscribe", "Throwable: " + throwable);
}
});
UPDATE 2
In the comment you asked:
Is there any way from which I can get final callback for all requests completed
That's the problem with using Observable
instead of Single
/Completable
, it just does not finish unless you explicitly close the channel or an error is thrown. In ideal context, Observable
should be used for streams that continuously emits some data, for example an open channel to Room DB, as there are no telling how many time the DB will change. I admit that in your case it seems to be difficult to apply something else than Observable
. There is however a workaround:
Observable.merge(requests)
// emits only this much items, then close this channel
.take(requests.size.toLong())
// executed when the channel is closed or disposed
.doFinally {
// todo: finalCallback
}
.subscribeOn(Schedulers.io())
.subscribe(...)
The code is again in Kotlin, should not be hard to transform to java. Please check what Observable.take()
does: http://reactivex.io/documentation/operators/take.html