I have a code like following:
authRepository.login(userName, password)
.doOnSubscribe(__ -> apiProgress.setValue(ApiProgress.start()))
.doFinally(() -> apiProgress.setValue(ApiProgress.stop()))
.subscribe(login -> loginData.setValue(login),
err -> apiError.setValue(ApiError.create(err)))
I need to repeat doOnSubscribe(..)
and doFinally
for all api calls.
Is there any way to achieve this thing ?
Welcome to StackOverflow! https://stackoverflow.com/conduct
You can create something like this using Transformer
static <T> SingleTransformer<T, T> subscribeAndFinalTransformer() {
return new SingleTransformer<T, T>() {
public SingleSource<T> apply(Single<T> upstream) {
return upstream.doOnSubscribe(disposable -> {
// Your doOnSubscribe Block
}).doFinally(() -> {
// Your doFinally Block
and above reusable Transformer
can be attached for all the Single
using compose
authRepository.login(userName, password).compose(subscribeAndFinalTransformer())
if you are using Observable
or Completable
, you should use equivalent Transformer
instead SingleTransformer
Above approach is convenient if you want to reuse some actions for only certain calls.
if you want to attach the actions to all of your API calls you can create Retrofit CallAdapter
class RxStreamAdapter implements CallAdapter {
private final Class rawType;
private final CallAdapter<Object, Object> nextAdapter;
private final Type returnType;
RxStreamAdapter(Class rawType,
Type returnType,
CallAdapter nextAdapter) {
this.rawType = rawType;
this.returnType = returnType;
this.nextAdapter = nextAdapter;
public Type responseType() {
return nextAdapter.responseType();
public Object adapt(Call call) {
if (rawType == Single.class) {
return ((Single) nextAdapter.adapt(call))
} else if (returnType == Completable.class) {
return ((Completable) nextAdapter.adapt(call))
} else {
// Observable
return ((Observable<Object>) nextAdapter.adapt(call))
private Consumer<Disposable> getDoOnSubscribe() {
return disposable -> {
private Action getDoFinally() {
return () -> {
And then add it while creating Retrofit Object (Before RxJava2CallAdapterFactory
RetrofitApi retrofitApi = new Retrofit
.addCallAdapterFactory(new CallAdapter.Factory() {
public CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
CallAdapter<?, ?> nextAdapter = retrofit.nextCallAdapter(this, returnType, annotations);
Class<?> rawType = getRawType(returnType);
if (rawType == Single.class || rawType == Observable.class || rawType == Completable.class) {
return new RxStreamAdapter(getRawType(returnType), returnType, nextAdapter);
} else {
return nextAdapter;
You can also set hooks using RxJavaPlugins
. But you cannot differentiate b/w normal stream and Retrofit stream.
Hope it helps!