Search code examples
androidrx-java2

RxJava2 Flowable : send objects to server on by one and detect end


I need to send a list of objects to my remote server. As they may be numerous and big I use a flowable to send them one by one from an arraylist using request(1).

For each object a retrofit call is made to the server and in return I get the remote ID and I update the local object with the remote id.

I need to detect the end of this task : ie the last response for the last object sent to prevent multiple concurent calls for the same objects.

For the moment all works well but I get the "completed" message before the answer arrives from the remote server so before the object is updated.

How can I do this ?

Flowable<Integer> observable = Flowable.range(0, objList.size());

        observable.subscribe(new DefaultSubscriber<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "on start");
                request(1);
            }

            @Override
            public void onNext(Integer t) {
                Log.d(TAG, "on next : " + t);
                MyObj = objList.get(t);

                RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

                    Log.d(TAG, "recu p");

                    if (p != null) {
                        try {
                            p.setSyncho(true);
                            // save remote id on obj
                            ObjDB.updateObj(p);
                            request(1);

                            return Observable.empty();
                        } catch (Throwable th) {
                            ExceptionHandler.logException(th);
                            return Observable.error(th);
                        }
                    } else {
                        request(1);
                        return Observable.empty();  // provisoirement si pb on renvoie vide
                    }
                })
                        .onErrorResumeNext(r -> {
                            request(1);
                            Observable.empty();
                        })
                        .onExceptionResumeNext(error -> Observable.empty()) // go to next on error
                        .subscribeOn(Schedulers.io()).onErrorReturn(error -> {
                    Log.d("ERROR", error.getMessage());
                    return 0;
                })

                        .onErrorResumeNext(Observable.empty()).subscribe();
            }


            @Override
            public void onError(Throwable t) {
                Log.e("XXX ERROR ", "" + t);
                request(1);
                patientSynchroInProgress = Boolean.FALSE;
            }

            @Override
            public void onComplete() {
                Log.e("XXX COMPLETE", "complete");
            }
        });

Solution

  • Finally, I was able to get it work

      Flowable.fromIterable(patientList)
                        .concatMap(item -> {
    
                                    item.setSomething();
                                    return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
                                            .createOrUpdateObj(item)
                                            .flatMap(p -> {
                                                if (p != null) {
                                                    try {
                                                        p.setSyncho(true);
                                                        // save remote id on obj
                                                        ObjDB.updateObj(p);
                                                        return Observable.empty();
                                                    } catch (Throwable th) {
                                                        ExceptionHandler.logException(th);
                                                        return Observable.error(th);
                                                    }
                                                } else {
                                                    return Observable.empty();  // provisoirement si pb on renvoie vide
                                                }
                                            })
    
                                            .onErrorResumeNext(Observable.empty())
                                            .toFlowable(BackpressureStrategy.BUFFER);
                                }
                        )
                        .doOnNext(s -> {
                            Log.d(TAG, ((Obj) s).toString());
    
                        })
                        .doOnComplete(() -> {
                            // do something when completed
                            Log.d(TAG, "COMPLETE");
                        })
                        .subscribe();
            }
        }
    

    Thank you for your help