Search code examples
javaandroidrx-javarx-android

How to not repeat the same operations in RxJava?


I have a code like following:

authRepository.login(userName, password)
              .doOnSubscribe(__ -> apiProgress.setValue(ApiProgress.start()))
              .doFinally(() -> apiProgress.setValue(ApiProgress.stop()))
              .subscribe(login -> loginData.setValue(login), 
                           err -> apiError.setValue(ApiError.create(err)))

I need to repeat doOnSubscribe(..) and doFinally for all api calls.

Is there any way to achieve this thing ?


Solution

  • Welcome to StackOverflow! https://stackoverflow.com/conduct

    You can create something like this using Transformer (http://reactivex.io/RxJava/javadoc/rx/Single.Transformer.html)

    static <T> SingleTransformer<T, T> subscribeAndFinalTransformer() {
            return new SingleTransformer<T, T>() {
                @Override
                public SingleSource<T> apply(Single<T> upstream) {
                    return upstream.doOnSubscribe(disposable -> {
                        // Your doOnSubscribe Block
                    }).doFinally(() -> {
                        // Your doFinally Block 
                    });
                }
            };
        }
    

    and above reusable Transformer can be attached for all the Single using compose method.

    authRepository.login(userName, password).compose(subscribeAndFinalTransformer())
    .subscribe()
    
    authRepository.anotherApi().compose(subscribeAndFinalTransformer()).subscribe()
    

    if you are using Observable or Completable, you should use equivalent Transformer instead SingleTransformer

    EDIT:

    Above approach is convenient if you want to reuse some actions for only certain calls.

    if you want to attach the actions to all of your API calls you can create Retrofit CallAdapter

    class RxStreamAdapter implements CallAdapter {
    
        private final Class rawType;
        private final CallAdapter<Object, Object> nextAdapter;
        private final Type returnType;
    
        RxStreamAdapter(Class rawType,
                        Type returnType,
                        CallAdapter nextAdapter) {
            this.rawType = rawType;
            this.returnType = returnType;
            this.nextAdapter = nextAdapter;
        }
    
        @Override
        public Type responseType() {
            return nextAdapter.responseType();
        }
    
        @Override
        public Object adapt(Call call) {
            if (rawType == Single.class) {
                return ((Single) nextAdapter.adapt(call))
                        .doOnSubscribe(getDoOnSubscribe())
                        .doFinally(getDoFinally());
            } else if (returnType == Completable.class) {
                return ((Completable) nextAdapter.adapt(call))
                        .doOnSubscribe(getDoOnSubscribe())
                        .doFinally(getDoFinally());
            } else {
                // Observable
                return ((Observable<Object>) nextAdapter.adapt(call))
                        .doOnSubscribe(getDoOnSubscribe())
                        .doFinally(getDoFinally());
            }
        }
    
        @NotNull
        private Consumer<Disposable> getDoOnSubscribe() {
            return disposable -> {
    
            };
        }
    
        @NotNull
        private Action getDoFinally() {
            return () -> {
    
            };
        }
    }
    

    And then add it while creating Retrofit Object (Before RxJava2CallAdapterFactory)

    RetrofitApi retrofitApi = new Retrofit
                    .Builder()
                    .addCallAdapterFactory(new CallAdapter.Factory() {
                        @Override
                        public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
                            CallAdapter<?, ?> nextAdapter = retrofit.nextCallAdapter(this, returnType, annotations);
                            Class<?> rawType = getRawType(returnType);
                            if (rawType == Single.class || rawType == Observable.class || rawType == Completable.class) {
                                return new RxStreamAdapter(getRawType(returnType), returnType, nextAdapter);
                            } else {
                                return nextAdapter;
                            }
                        }
                    })
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build()
    

    You can also set hooks using RxJavaPlugins. But you cannot differentiate b/w normal stream and Retrofit stream.

    Hope it helps!