Search code examples
javaretrofitrx-javareactive-programming

How to retry only requests that resulted in 500 and did it below given time threshold?


Tech stack: rx-java 1.1.x, retrofit 1.9.x, spring 4.3.x.

A bit of context: I'm pretty new to rx-java. My service A has an API endpoint that makes a search call to service B that is frequently used, and fails a bit too often then it should. Some of the errors are pretty clear timeouts from other different services deep down, that took >30s, but quite a lot of them are pretty quick ones, around <1s.

What exactly I'm trying to do: Retry only the calls that fail under given threshold (let's say <1s), ideally the ones returning 5xx HTTP code responses.

Ideas that came to my mind, but do not solve the problem: Regular Observable.timeout() seems of no use, because for now I don't want to touch (interrupt) calls that are taking longer. I only want to retry those that came back as failed (5XX response), not interrupt the longer ones. retry() seems of no use, because I don't want to simply retry every failed call. retryWhen() could be of use, but I am not sure how can I extract the HTTP from a Throwable and what exactly should I measure in the Observable call.

Code:

@RestController
@RequestMapping(...)
public class MyController {

@RequestMapping(method = GET)
public DeferredResult<MyJsonWrapper> fetchSomething() {
    MySearchRequest searchRequest, 
    BindingResult bindingResult, 
    HttpServletRequest request) {

    return new MyDeferredResult(
        serviceB.searchSomething(...)
        .doOnNext( result -> /* log size of search */ ));
    }

serviceB.searchSomething(...) also returns Observable<MyJsonWrapper>

What is MyDeferredResult:

class MyDeferredResult<T> extends DeferredResult<T> {

    public MyDeferredResult(Observable<T> observable) {

        onTimeout(this::handleTimeout);
        ConnectableObservable<T> publication = observable.publish();
        publication.subscribe(this::onNext, this::onError, this::onCompleted);
        publication.connect(subscription -> this.subscription = subscription);
    }

    (...)
    private void handleTimeout() {
        setErrorResult(new MyTimeoutException( /* some info about request */ ));
        subscription.unsubscribe();
    }

How can I retry only the requests that failed under 1s that are 5xx HTTP responses?


Solution

  • I have been able to implement a working solution. To measure the Observable's time I chose Spring's StopWatch, started counting in doOnSubscribe() and stopped in doOnTerminate().

    I create the StopWatch and pass it to my custom retry function used in retryWhen(), and only when the code goes to the retryWhen() block I check if the time was under my given threshold.

    How my call looks like now:

    StopWatch stopWatch = new StopWatch();
    int executionTimeThresholdMillis = 1000; // 1 second
    
    return new MyDeferredResult(
        serviceB.searchSomething(...)
        .doOnSubscribe(stopWatch::start)
        .doOnTerminate(stopWatch::stop)
        .retryWhen(
            return RetryGivenHttpResponsesUnderThreshold.builder()
                .maxRetries(MAX_RETRIES)
                .httpResponsesToRetry(List.of(HTTP_CODE_TO_FAIL))
                .observableExecutionTime(stopWatch)
                .executionTimeThresholdMillis(executionTimeThresholdMillis)
                .build())
        .doOnNext( result -> /* log size of search */ ));
    }
    

    Now, the example of how could you implement the retry function. I want both checking the HTTP response and elapsed time, so the code is only somehow configurable. I hope someone else will also use it and then change it accordingly to one's needs:

    public class RetryGivenHttpResponsesUnderThreshold implements Func1<Observable<? extends Throwable>, Observable<?>> {
    
    
    private final int maxRetries;
    private final Collection<Integer> httpResponsesToRetry;
    private int retryCount;
    
    private final boolean isMeasurable;
    private final long maxObservableExecutionTimeMilis;
    private final StopWatch stopWatch;
    
    (...) 
    // constructors, builders, validations...
    
    @Override
    public Observable<?> call(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(throwable -> {
                    boolean needsRetry = false;
                    if (throwable instanceof HttpException) {
                        if (httpResponsesToRetry.contains(((HttpException) throwable).code())) {
    
                            // !IMPORTANT! in my case I want to get getLastTaskTimeMillis(), and NOT getTotalTimeMillis()
                            // because the timer will be stopped on every error that will trigger retry
                            final long observableExecutionTimeMilis = stopWatch.getLastTaskTimeMillis();
    
                            if (isMeasurable) {
                                needsRetry = observableExecutionTimeMilis <= maxObservableExecutionTimeMilis;
                            } else  {
                                needsRetry = true;
                            }
                        }
                    }
    
                    if (needsRetry && retryCount < maxRetries) {
                        retryCount++;
                        // Simply retry.
                        return Observable.just(null);
                    }
    
                    // Just pass the error along.
                    return Observable.error(throwable);
                });
    }
    

    }