Search code examples
androidrx-javareactive-programmingrx-androidreactivex

Limit actions with rxJava and retryWhen operator


My application must do two things in general:

  • Accept only one network request at the same time
  • Retry if request failed

That's how I implement it:

public class RequestsLocker {

    private volatile boolean isLocked;

    public <T> Observable.Transformer<T, T> applyLocker() {
        if(!isLocked()) {
            return observable -> observable
                    .doOnSubscribe(() -> {
                        lockChannel();
                    })
                    .doOnUnsubscribe(() -> {
                        freeChannel();
                    });
        } else {
            return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
        }
    }

    private void lockChannel() {
        isLocked = true;
    }

    private void freeChannel() {
        isLocked = false;
    }

    public boolean isLocked() {
        return isLocked;
    }

}

Looks nice.

Now my retryWhen implementation:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
    return observable.flatMap(error -> {
        // For IOExceptions, we  retry
        if (error instanceof IOException) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }

        // For anything else, don't retry
        return Observable.error(error);
    });
}

There is how I use it:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .subscribe(subscriber);
}

The main problem that doOnUnsubscribe() called on any error and then locker is open for any new request until the timer expires and resubscribing happens again. That's the problem. While the timer is ticking user can make another request.

How I can fix it?


Solution

  • The problem is that you're applying your transformer to the source observable i.e. before your retrywhen. When there is an error you're always going to unsubscribe from and then resubscribe to the source observable leading to your doOnUnsubscribe being called.

    I suggest you try

    public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
        return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));            
    }
    
    
    public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                                 String notes, Subscriber<List<QueueCarItem>> subscriber) {
        queueApiMediator.finishService(carItem.getId(), paymentType, notes)
                .subscribeOn(ioScheduler)
                .observeOn(uiScheduler)
                .doOnError(this::handleError)
                .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
                .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
                .subscribe(subscriber);
    }
    

    PS: The apply locker transformer looks a bit different i.e. it doesn't take an argument in the code you linked.