I need to send a list of objects to my remote server. As they may be numerous and big I use a flowable to send them one by one from an arraylist using request(1).
For each object a retrofit call is made to the server and in return I get the remote ID and I update the local object with the remote id.
I need to detect the end of this task : ie the last response for the last object sent to prevent multiple concurent calls for the same objects.
For the moment all works well but I get the "completed" message before the answer arrives from the remote server so before the object is updated.
How can I do this ?
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
MyObj = objList.get(t);
RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty()).subscribe();
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
Finally, I was able to get it work
Flowable.fromIterable(patientList)
.concatMap(item -> {
item.setSomething();
return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
.createOrUpdateObj(item)
.flatMap(p -> {
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(Observable.empty())
.toFlowable(BackpressureStrategy.BUFFER);
}
)
.doOnNext(s -> {
Log.d(TAG, ((Obj) s).toString());
})
.doOnComplete(() -> {
// do something when completed
Log.d(TAG, "COMPLETE");
})
.subscribe();
}
}
Thank you for your help