My application must do two things in general:
That's how I implement it:
public class RequestsLocker {
private volatile boolean isLocked;
public <T> Observable.Transformer<T, T> applyLocker() {
if(!isLocked()) {
return observable -> observable
.doOnSubscribe(() -> {
lockChannel();
})
.doOnUnsubscribe(() -> {
freeChannel();
});
} else {
return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
}
}
private void lockChannel() {
isLocked = true;
}
private void freeChannel() {
isLocked = false;
}
public boolean isLocked() {
return isLocked;
}
}
Looks nice.
Now my retryWhen
implementation:
public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
return observable.flatMap(error -> {
// For IOExceptions, we retry
if (error instanceof IOException) {
return Observable.timer(2, TimeUnit.SECONDS);
}
// For anything else, don't retry
return Observable.error(error);
});
}
There is how I use it:
public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
.compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}
...
public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
String notes, Subscriber<List<QueueCarItem>> subscriber) {
queueApiMediator.finishService(carItem.getId(), paymentType, notes)
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
.subscribe(subscriber);
}
The main problem that doOnUnsubscribe()
called on any error and then locker is open for any new request until the timer expires and resubscribing happens again. That's the problem. While the timer is ticking user can make another request.
How I can fix it?
The problem is that you're applying your transformer to the source observable i.e. before your retrywhen
.
When there is an error you're always going to unsubscribe from and then resubscribe to the source observable
leading to your doOnUnsubscribe
being called.
I suggest you try
public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));
}
public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
String notes, Subscriber<List<QueueCarItem>> subscriber) {
queueApiMediator.finishService(carItem.getId(), paymentType, notes)
.subscribeOn(ioScheduler)
.observeOn(uiScheduler)
.doOnError(this::handleError)
.retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
.compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
.subscribe(subscriber);
}
PS: The apply locker transformer looks a bit different i.e. it doesn't take an argument in the code you linked.