Search code examples

Retrofit2+RxJava2, Invalid token, how to update stream when retryWhen() re-subscribe

I have this simple code below that simulates a scenario Im currently trying to accomplish

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {

                public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return mApiService.api().getAccessToken();
            .subscribe(new Observer<Void>() {
                public void onSubscribe(Disposable d) {

                public void onNext(Void value) {

                public void onError(Throwable e) {


                public void onComplete() {

Ill just enumerate it to make my goal clear:

  1. perform a POST call with a current access token
  2. if it receives an appropriate error (404,403, 401 or such)
  3. perform a GET call to have a fresh access token
  4. retry the whole sequence using the new access token

based on the code above and my understanding so far with .retryWhen(), is that it will execute if an error happened on the original Observable( .postSomethingWithAccessToken()), and retrying if necessary (based on your conditions inside retry), what happens here is that the .retryWhen() executes first before the outer Observable, causing undesired duplicate request, how can I achieve those things I mentioned above, based on my current understanding(code)? Any help will be greatly appreciated. :(

Edit: Current workaround:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

                public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            if (throwable instanceof HttpException) {

                                HttpException httpException = (HttpException) throwable;

                                if (httpException.code() == 401) {

                                    return mApiService.api().getAccessToken()
                                            .doOnNext(new Consumer<Authentication>() {
                                                public void accept(Authentication authentication) throws Exception {

                            return Observable.error(throwable);
            .subscribe(new Observer<Void>() {
                public void onSubscribe(Disposable d) {
                    Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));

                public void onNext(Void value) {
                    Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));

                public void onError(Throwable e) {

                public void onComplete() {
                    Log.e("Complete", "____ COMPLETE");

Method that updates the token via shared preference

public void update(Authentication authentication) {
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit();

I noticed that(i put a Log) the outer observable's subscribe and the retryWhen was executed at main thread, but the stream of retrying/resubscribing is jumping over different Scheduler's thread, it seems like a race condition :(

    onSubscrbie_outer_observable: Thread[main,5,main]
    RetryWhen: Thread[main,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    // and so on...


  • There are few problems here:

    • you need to pass back the access token to the postSomethingWithAccessToken method when retrying, otherwise you'll just retry with the same old invalid access token.
    • your retry when logic is incorrect, you must respond to the errors Observable you get and put your retry logic there. as you were saying this method is executed first, not when error happens, the throwableObservable is what response to error, it will mirror errors as emissions (onNext()), you can flatMap() each error and response either with error (for delivering error to the source stream) complete , or with onNext() with some object to signal it to retry.
      A great blog post ban Dan Lew on this subject.

    So you need:
    1) to store the Access Token somewhere where you can change it with access token refresh.
    2) fix the retry when logic to respond properly to errors

    Here's a suggestion code:

    postSomethingWithAccessToken(request, accessToken)
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                       public ObservableSource<?> apply(
                               @NonNull Observable<Throwable> throwableObservable) throws Exception {
                           return throwableObservable.flatMap(
                                   new Function<Throwable, ObservableSource<? extends R>>() {
                                       public ObservableSource<? extends R> apply(
                                               @NonNull Throwable throwable) throws Exception {
                                           if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
                                               return getAccessToken()
                                                               .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
                                                       //or keep accessToken on some field, the point to have mutable
                                                       //var that you can change and postSomethingWithAccessToken can see
                                           return Observable.error(throwable);
            .subscribe(new Consumer<Result>() {
                           public void accept(@NonNull Result result) throws Exception {
                               //handle result