Search code examples
javajava-11completable-futureretry-logic

Java 11 Completable Future retry on a custom condition and not on exception


I have a completable future defined below

CompletableFuture<Person> personFutures = personService.getPersons();

Now, based on a particular condition, I need to check and do the call to getPersons until the condition is matched or the number of retries (5 retries, 5seconds apart) have finished.

The condition will be

if(personFutures.get().size() != totalPersonsInOrg){
 retry(personService.getPersons(), 5, 5)
} else {
 return persons
}

I want to use the thenApply and thenCompose to chain these after the first completablefuture.

personFutures.thenApply(persons -> {
     if(persons.size() != totalPersonsOrg){
      retry(personservice,5,5)
     }
})

This is what needs to be changed

private boolean allPersonsFound(String id, int retry, int c 
         count) 
{ 
    if (retry > maxRetries) {
        return false;
    }

     CompletableFuture<List<Persons>> personsFuture = personaService.getPersons();
    List<Persons> persons = personsFuture.get();

    if (persons.size() != count) {
        //add delay of 50ms
        return allPersonsFound(id, retry++, count);
    }
    return true;
}

Solution

  • Assuming your PersonsService is:

    interface PersonsService {
       CompletableFuture<Persons> getPersons();
    }
    

    You probably want to have a proxy implementation with additional validation and retry logic.

    One possibility is to use asynchronous recursion. Something like this (I have not tried to run it!):

    final class ValidatedPersonsService implements PersonsService {
        private final PersonsService upstreamService;
        private final Predicate<Persons> validationPredicate;
        private final int numberOfAttempts;
        private final long backoffDuratioMs;
        private final Executor executor;
        private final Executor delayedExecutor;
    
        ValidatedPersonsService(final PersonsService upstreamService,
                                final Predicate<Persons> validationPredicate,
                                final int numberOfAttempts,
                                final long backoffDuratioMs,
                                final Executor executor) {
            this.upstreamService = upstreamService;
            this.validationPredicate = validationPredicate;
            this.numberOfAttempts = numberOfAttempts;
            this.backoffDuratioMs = backoffDuratioMs;
            this.executor = executor;
            this.delayedExecutor = CompletableFuture.delayedExecutor(backoffDuratioMs, TimeUnit.MILLISECONDS, executor);
        }
    
        // this one is needed to track number of passed attempts through the async recursion steps
        private static final class PersonsResponse {
            final Persons persons;
            final int attempt;
    
            private PersonsResponse(final Persons persons, final int attempt) {
                this.persons = persons;
                this.attempt = attempt;
            }
        }
    
        @Override
        public CompletableFuture<Persons> getPersons() {
            return submitRequest(1, executor)
                    .thenApply(response -> response.persons);
        }
    
        private CompletableFuture<PersonsResponse> submitRequest(int currentAttempt, Executor executor) {
            if (currentAttempt > numberOfAttempts) {
                return CompletableFuture.failedFuture(new RuntimeException("max number of attempts exceeded"));
            } else {
                return upstreamService
                        .getPersons()
                        .thenApplyAsync(persons -> new PersonsResponse(persons, currentAttempt), executor) // break out into our executor, to not rely on concurrency model of the upstream service
                        .thenCompose(this::validateResponse);
            }
        }
    
        private CompletableFuture<PersonsResponse> validateResponse(PersonsResponse response) {
            if (validationPredicate.test(response.persons)) {
                return CompletableFuture.completedFuture(response);
            } else {
                return submitRequest(response.attempt + 1, delayedExecutor);
            }
        }
    }