Search code examples
androidrealmrx-java2

returning subscriber in RxJava after storing data fetch from webservice


I am trying to call the web service to fetch the data and storing it into database using following code. I have created a separate class to perform following operation.

Now , the issue is i want to notify my activity when i successfully fetch and store data in database. if some error occurs then i want to show that on UI itself.

somehow i am able to write a code to fetch the data using pagination but not sure how would i notify UI where i can subscribe catch the update related to progress and error if any.

public Flowable<Response> getFitnessData() {

        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

        Flowable<Response> fitnessFlowable = new WebRequest()
                                            .getRemoteClient()
                                            .create(FitnessApi.class)
                                            .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


         fitnessFlowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .takeUntil(response->response.getSummary().getNext()!=null)

                .subscribe(new Subscriber<Response>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Response response) {

                        Log.e(TAG, "onNext" );

                        if(response !=null){

                            if(response.getFitness()!=null && response.getFitness().size()!=0){

                                Realm realm = Realm.getDefaultInstance();
                                realm.executeTransactionAsync(new Realm.Transaction() {
                                    @Override
                                    public void execute(Realm realm) {

                                        realm.copyToRealmOrUpdate(response.getFitness());

                                    }
                                }, new Realm.Transaction.OnSuccess() {
                                    @Override
                                    public void onSuccess() {

                                        Log.i(TAG, "onSuccess , fitness data saved");

                                    }
                                }, new Realm.Transaction.OnError() {
                                    @Override
                                    public void onError(Throwable error) {
                                        Log.i(TAG, "onError , fitness data failed to save"+error.getMessage() );
                                    }
                                });
                            }else{

                                Log.i(TAG, "onError , no fitness data available" );


                            }

                        }else{
                            Log.i(TAG, "onError , response is null" );

                        }
                    }

                    @Override
                    public void onError(Throwable t) {


                        Log.e(TAG, "onError" +t.getMessage());
                    }

                    @Override
                    public void onComplete() {

                        Log.e(TAG, "onComplete");
                    }
                });;

            return null;

    }

Updated

Created RxBus to propagate events and error on UI

public class RxBus {

    private static final RxBus INSTANCE = new RxBus();

    private RxBus(){}
    private PublishSubject<Object> bus = PublishSubject.create();

    public static RxBus getInstance() {
        return INSTANCE;
    }


    public void send(Object o) {
        bus.onNext(o);
    }

    public void error(Throwable e){bus.onError(e);}

    public Observable<Object> toObservable() {
        return bus;
    }
}

in activity

 FitnessRepo fitnessRepo = new FitnessRepo();
        fitnessRepo.getFitnessData();
        RxBus.getInstance().toObservable().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

                if(o instanceof RealmList ){

                    RealmList<Fitness> realmList = (RealmList<Fitness>) o;
                    Log.e(TAG,"Fitness data size "+realmList.size());

                }
            }

            @Override
            public void onError(Throwable e) {

                Log.e(TAG,e.getMessage()+ "");

                if (e instanceof HttpException) {
                    ResponseBody body = ((HttpException) e).response().errorBody();


                    try {

                        Response response=  LoganSquare.parse(body.byteStream(),Response.class);

                        if(response.getErrors() !=null)
                            if(response.getErrors().size() > 0)
                                Log.e(TAG, "Error "+response.getErrors().get(0).getErrors());
                    } catch (IOException t) {
                        t.printStackTrace();
                    }

                }
            }

            @Override
            public void onComplete() {

            }
        });

in a web service call

public void getFitnessData() {


        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");
        request.setEnd_date("2018-07-01T00:00:00");
        Flowable<Response> fitnessFlowable = new WebRequest()
                .getRemoteClient()
                .create(FitnessApi.class)
                .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


         fitnessFlowable.subscribeOn(Schedulers.io())
                .takeUntil(response->response.getSummary().getNext()!=null)
                .doOnNext((response) -> {
                    if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) {


                        Log.e(TAG, " Error ");
                        return;
                    }

                    RxBus.getInstance().send(response.getFitness());

                    try(Realm r = Realm.getDefaultInstance()) {
                        r.executeTransaction((realm) -> {
                            realm.copyToRealmOrUpdate(response.getFitness());
                        });
                    }
                }).subscribe(item ->{


                 },
                 error ->{

                     RxBus.getInstance().error(error);


                 });
    }

Solution

  • I have good news for you! You can delete almost all of that code and just make it generally better as a result!

    public void fetchFitnessData() {
    
        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");
    
        Flowable<Response> fitnessFlowable = new WebRequest()
                                            .getRemoteClient()
                                            .create(FitnessApi.class)
                                            .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());
    
    
         fitnessFlowable.subscribeOn(Schedulers.io())
                .takeUntil(response->response.getSummary().getNext()!=null)
                .doOnNext((response) -> {
                        if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;
    
                        try(Realm r = Realm.getDefaultInstance()) {
                            r.executeTransaction((realm) -> {
                                realm.insertOrUpdate(response.getFitness());
                            });
                        }
                    }
                }).subscribe();
    }
    

    This method is on a background thread now and returns void, so the way to emit stuff out of this method would be to use either a PublishSubject (one for success, one for failure) or an EventBus.

    private PublishSubject<Object> fitnessResults;
    public Observable<Object> observeFitnessResults() {
        return fitnessResults;
    }
    
    public static class Success {
        public Success(List<Fitness> data) {
            this.data = data;
        }
    
        public List<Fitness> data;
    }
    
    public static class Failure {
        public Failure(Exception exception) {
            this.exception = exception;
        }
    
        public Exception exception;
    }
    
    public void fetchFitnessData() {
        ...
            fitnessResults.onNext(new Success(data));
        } catch(Exception e) {
            fitnessResults.onNext(new Failure(e));
    

    And then

    errors = observeFitnessResults().ofType(Error.class);
    success = observeFitnessResults().ofType(Success.class);