Search code examples
rx-javaretrofit2rx-java2reactiveresque-retry

Retrofit2+RxJava2, Invalid token, how to update stream when retryWhen() re-subscribe


I have this simple code below that simulates a scenario Im currently trying to accomplish

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {

                @Override
                public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return mApiService.api().getAccessToken();
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Void value) {
                }

                @Override
                public void onError(Throwable e) {

                    e.printStackTrace();
                    onError(e);
                }

                @Override
                public void onComplete() {
                }
            });

Ill just enumerate it to make my goal clear:

  1. perform a POST call with a current access token
  2. if it receives an appropriate error (404,403, 401 or such)
  3. perform a GET call to have a fresh access token
  4. retry the whole sequence using the new access token

based on the code above and my understanding so far with .retryWhen(), is that it will execute if an error happened on the original Observable( .postSomethingWithAccessToken()), and retrying if necessary (based on your conditions inside retry), what happens here is that the .retryWhen() executes first before the outer Observable, causing undesired duplicate request, how can I achieve those things I mentioned above, based on my current understanding(code)? Any help will be greatly appreciated. :(

Edit: Current workaround:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

                @Override
                public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            if (throwable instanceof HttpException) {

                                HttpException httpException = (HttpException) throwable;

                                if (httpException.code() == 401) {

                                    return mApiService.api().getAccessToken()
                                            .doOnNext(new Consumer<Authentication>() {
                                                @Override
                                                public void accept(Authentication authentication) throws Exception {
                                                    update(authentication);
                                                }
                                            });
                                }
                            }

                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onNext(Void value) {
                    Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                    Log.e("Complete", "____ COMPLETE");
                }
            });

Method that updates the token via shared preference

public void update(Authentication authentication) {
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}

I noticed that(i put a Log) the outer observable's subscribe and the retryWhen was executed at main thread, but the stream of retrying/resubscribing is jumping over different Scheduler's thread, it seems like a race condition :(

    onSubscrbie_outer_observable: Thread[main,5,main]
    RetryWhen: Thread[main,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    // and so on...

Solution

  • There are few problems here:

    • you need to pass back the access token to the postSomethingWithAccessToken method when retrying, otherwise you'll just retry with the same old invalid access token.
    • your retry when logic is incorrect, you must respond to the errors Observable you get and put your retry logic there. as you were saying this method is executed first, not when error happens, the throwableObservable is what response to error, it will mirror errors as emissions (onNext()), you can flatMap() each error and response either with error (for delivering error to the source stream) complete , or with onNext() with some object to signal it to retry.
      A great blog post ban Dan Lew on this subject.

    So you need:
    1) to store the Access Token somewhere where you can change it with access token refresh.
    2) fix the retry when logic to respond properly to errors

    Here's a suggestion code:

    postSomethingWithAccessToken(request, accessToken)
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                       @Override
                       public ObservableSource<?> apply(
                               @NonNull Observable<Throwable> throwableObservable) throws Exception {
                           return throwableObservable.flatMap(
                                   new Function<Throwable, ObservableSource<? extends R>>() {
                                       @Override
                                       public ObservableSource<? extends R> apply(
                                               @NonNull Throwable throwable) throws Exception {
                                           if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
                                               return getAccessToken()
                                                               .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
                                                       //or keep accessToken on some field, the point to have mutable
                                                       //var that you can change and postSomethingWithAccessToken can see
                                           }
                                           return Observable.error(throwable);
                                       }
                                   });
                           }
                       }
            )
            .subscribeOn(Schedulers.io())
            .subscribe(new Consumer<Result>() {
                           @Override
                           public void accept(@NonNull Result result) throws Exception {
                               //handle result
                           }
                       }
            );