Tech stack: rx-java
1.1.x, retrofit
1.9.x, spring
4.3.x.
A bit of context: I'm pretty new to rx-java. My service A has an API endpoint that makes a search call to service B that is frequently used, and fails a bit too often then it should. Some of the errors are pretty clear timeouts from other different services deep down, that took >30s, but quite a lot of them are pretty quick ones, around <1s.
What exactly I'm trying to do: Retry only the calls that fail under given threshold (let's say <1s), ideally the ones returning 5xx HTTP code responses.
Ideas that came to my mind, but do not solve the problem:
Regular Observable.timeout()
seems of no use, because for now I don't want to touch (interrupt) calls that are taking longer. I only want to retry those that came back as failed (5XX
response), not interrupt the longer ones.
retry()
seems of no use, because I don't want to simply retry every failed call.
retryWhen()
could be of use, but I am not sure how can I extract the HTTP from a Throwable and what exactly should I measure in the Observable call.
Code:
@RestController
@RequestMapping(...)
public class MyController {
@RequestMapping(method = GET)
public DeferredResult<MyJsonWrapper> fetchSomething() {
MySearchRequest searchRequest,
BindingResult bindingResult,
HttpServletRequest request) {
return new MyDeferredResult(
serviceB.searchSomething(...)
.doOnNext( result -> /* log size of search */ ));
}
serviceB.searchSomething(...)
also returns Observable<MyJsonWrapper>
What is MyDeferredResult:
class MyDeferredResult<T> extends DeferredResult<T> {
public MyDeferredResult(Observable<T> observable) {
onTimeout(this::handleTimeout);
ConnectableObservable<T> publication = observable.publish();
publication.subscribe(this::onNext, this::onError, this::onCompleted);
publication.connect(subscription -> this.subscription = subscription);
}
(...)
private void handleTimeout() {
setErrorResult(new MyTimeoutException( /* some info about request */ ));
subscription.unsubscribe();
}
How can I retry only the requests that failed under 1s that are 5xx HTTP responses?
I have been able to implement a working solution. To measure the Observable's time I chose Spring's StopWatch
, started counting in doOnSubscribe()
and stopped in doOnTerminate()
.
I create the StopWatch and pass it to my custom retry function used in retryWhen()
, and only when the code goes to the retryWhen()
block I check if the time was under my given threshold.
How my call looks like now:
StopWatch stopWatch = new StopWatch();
int executionTimeThresholdMillis = 1000; // 1 second
return new MyDeferredResult(
serviceB.searchSomething(...)
.doOnSubscribe(stopWatch::start)
.doOnTerminate(stopWatch::stop)
.retryWhen(
return RetryGivenHttpResponsesUnderThreshold.builder()
.maxRetries(MAX_RETRIES)
.httpResponsesToRetry(List.of(HTTP_CODE_TO_FAIL))
.observableExecutionTime(stopWatch)
.executionTimeThresholdMillis(executionTimeThresholdMillis)
.build())
.doOnNext( result -> /* log size of search */ ));
}
Now, the example of how could you implement the retry function. I want both checking the HTTP response and elapsed time, so the code is only somehow configurable. I hope someone else will also use it and then change it accordingly to one's needs:
public class RetryGivenHttpResponsesUnderThreshold implements Func1<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final Collection<Integer> httpResponsesToRetry;
private int retryCount;
private final boolean isMeasurable;
private final long maxObservableExecutionTimeMilis;
private final StopWatch stopWatch;
(...)
// constructors, builders, validations...
@Override
public Observable<?> call(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap(throwable -> {
boolean needsRetry = false;
if (throwable instanceof HttpException) {
if (httpResponsesToRetry.contains(((HttpException) throwable).code())) {
// !IMPORTANT! in my case I want to get getLastTaskTimeMillis(), and NOT getTotalTimeMillis()
// because the timer will be stopped on every error that will trigger retry
final long observableExecutionTimeMilis = stopWatch.getLastTaskTimeMillis();
if (isMeasurable) {
needsRetry = observableExecutionTimeMilis <= maxObservableExecutionTimeMilis;
} else {
needsRetry = true;
}
}
}
if (needsRetry && retryCount < maxRetries) {
retryCount++;
// Simply retry.
return Observable.just(null);
}
// Just pass the error along.
return Observable.error(throwable);
});
}
}