Search code examples
androidretrofit2rx-androidrx-java2

How to continue Subscribed to observable even after onError called


I'm newbie to RxJava and I do my learning through some samples. I have done a sample RxJava Retrofit app which will show the details of movies from http://www.omdbapi.com. The app has a searchBox using which the user inputs a movie name, which will be fetched and sent as api request, and upon the response, the result is shown. My issue is whenever an error occur, after the onError, the edittext observable doesn't emit anything anymore. So, basically if one movie search fails due to any API error, I need to close and re-launch the app in order to continue with the movie search. How can I observe to edittext changes even after the onError? Below is my code:

public class MainActivity extends AppCompatActivity implements SearchView.OnQueryTextListener {

private static final String TAG = "LOG";
@Inject
ApiInterface mApiInterface;
private MainActivityViewHelper mMainActivityViewHelper;
BehaviorSubject<String> mStringSubject = BehaviorSubject.create();
private ViewModel mVm;
private Observer<MovieData> mMovieDataObserver;


@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);

    //setting up views / databining and dagger
    mVm = new ViewModel();
    ActivityMainBinding binding = DataBindingUtil.setContentView(this, R.layout.activity_main);
    binding.setVm(mVm);
    App.get(this).getAppComponent().inject(this);
    mMainActivityViewHelper = new MainActivityViewHelper();
    mMainActivityViewHelper.setSearchToolbar(this, this);


    searchSubscription().subscribe(mMovieDataObserver);


}

private Observable<MovieData> searchSubscription() {
    mMovieDataObserver = new Observer<MovieData>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "onSubscribe: ");
        }

        @Override
        public void onNext(@NonNull MovieData movieData) {
            Log.d(TAG, "onNext: " + movieData);
            mVm.loading.set(false);
            mVm.moviedata.set(movieData);
        }

        @Override
        public void onError(@NonNull Throwable e) {
            Log.d(TAG, "onError: " + e.getMessage());
            searchSubscription();
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: ");
        }
    };

    Observable<MovieData> movieDataObservable = mStringSubject
            .filter(s -> s != null)
            .doOnNext(s -> Log.d(TAG, s))
            .debounce(500, TimeUnit.MILLISECONDS)
            .doOnNext(s -> Log.d(TAG, "onCreate: " + s))
            .flatMap(s -> mApiInterface.getMovie(s))
            .onErrorReturn(throwable -> null)
            .doOnSubscribe(disposable -> mVm.loading.set(true))
            .doFinally(() -> mVm.loading.set(false))
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread());

    return movieDataObservable;
}

@Override
public boolean onCreateOptionsMenu(final Menu menu) {
    getMenuInflater().inflate(R.menu.menu_home, menu);
    return true;
}

@Override
public boolean onOptionsItemSelected(MenuItem item) {
    switch (item.getItemId()) {
        case R.id.action_search:
            if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.LOLLIPOP)
                Anim.circleReveal(this, R.id.searchtoolbar, 1, true, true);
            else
                mMainActivityViewHelper.mSearchToolbar.setVisibility(View.VISIBLE);
            mMainActivityViewHelper.mItem.expandActionView();
            return true;
        default:
            return super.onOptionsItemSelected(item);
    }
}

@Override
public boolean onQueryTextSubmit(String query) {
    return true;
}

@Override
public boolean onQueryTextChange(String newText) {
    mStringSubject.onNext(newText);
    return true;
  }


}

Solution

  • Here , when an error occurs in the retrofit observable, the error continues to your main observable, and then your stream stops. You can skip the error from the retrofit observable to pass to your main stream. You could make use of any error handling operators as specified here How to ignore error and continue infinite stream?

    Try to apply some error handling to your retrofit observable , which you are returning from the flatMap

    For example

    Observable<MovieData> movieDataObservable = mStringSubject
            .filter(s -> s != null)
            .doOnNext(s -> Log.d(TAG, s))
            .debounce(500, TimeUnit.MILLISECONDS)
            .doOnNext(s -> Log.d(TAG, "onCreate: " + s))
            .flatMap(s -> mApiInterface.getMovie(s).onErrorReturn(throwable -> new MovieData()))
            .doOnSubscribe(disposable -> mVm.loading.set(true))
            .doFinally(() -> mVm.loading.set(false))
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread());
    

    Here , on error , the retrofit observable will return an empty MovieData object instead of calling onError. You could check for an empty MovieData object for error case and handle accordingly