Search code examples
javaexceptionasynchronousconcurrencyjava-8

Retry logic with CompletableFuture


I need to submit a task in an async framework I'm working on, but I need to catch for exceptions, and retry the same task multiple times before "aborting".

The code I'm working with is:

int retries = 0;
public CompletableFuture<Result> executeActionAsync() {

    // Execute the action async and get the future
    CompletableFuture<Result> f = executeMycustomActionHere();

    // If the future completes with exception:
    f.exceptionally(ex -> {
        retries++; // Increment the retry count
        if (retries < MAX_RETRIES)
            return executeActionAsync();  // <--- Submit one more time

        // Abort with a null value
        return null;
    });

    // Return the future    
    return f;
}

This currently doesn't compile because the return type of the lambda is wrong: it expects a Result, but the executeActionAsync returns a CompletableFuture<Result>.

How can I implement this fully async retry logic?


Solution

  • I think I was successfully. Here's an example class I created and the test code:


    RetriableTask.java

    public class RetriableTask
    {
        protected static final int MAX_RETRIES = 10;
        protected int retries = 0;
        protected int n = 0;
        protected CompletableFuture<Integer> future = new CompletableFuture<Integer>();
    
        public RetriableTask(int number) {
            n = number;
        }
    
        public CompletableFuture<Integer> executeAsync() {
            // Create a failure within variable timeout
            Duration timeoutInMilliseconds = Duration.ofMillis(1*(int)Math.pow(2, retries));
            CompletableFuture<Integer> timeoutFuture = Utils.failAfter(timeoutInMilliseconds);
    
            // Create a dummy future and complete only if (n > 5 && retries > 5) so we can test for both completion and timeouts. 
            // In real application this should be a real future
            final CompletableFuture<Integer> taskFuture = new CompletableFuture<>();
            if (n > 5 && retries > 5)
                taskFuture.complete(retries * n);
    
            // Attach the failure future to the task future, and perform a check on completion
            taskFuture.applyToEither(timeoutFuture, Function.identity())
                .whenCompleteAsync((result, exception) -> {
                    if (exception == null) {
                        future.complete(result);
                    } else {
                        retries++;
                        if (retries >= MAX_RETRIES) {
                            future.completeExceptionally(exception);
                        } else {
                            executeAsync();
                        }
                    }
                });
    
            // Return the future    
            return future;
        }
    }
    

    Usage

    int size = 10;
    System.out.println("generating...");
    List<RetriableTask> tasks = new ArrayList<>();
    for (int i = 0; i < size; i++) {
        tasks.add(new RetriableTask(i));
    }
    
    System.out.println("issuing...");
    List<CompletableFuture<Integer>> futures = new ArrayList<>();
    for (int i = 0; i < size; i++) {
        futures.add(tasks.get(i).executeAsync());
    }
    
    System.out.println("Waiting...");
    for (int i = 0; i < size; i++) {
        try {
            CompletableFuture<Integer> future = futures.get(i);
            int result = future.get();
            System.out.println(i + " result is " + result);
        } catch (Exception ex) {
            System.out.println(i + " I got exception!");
        }
    }
    System.out.println("Done waiting...");
    

    Output

    generating...
    issuing...
    Waiting...
    0 I got exception!
    1 I got exception!
    2 I got exception!
    3 I got exception!
    4 I got exception!
    5 I got exception!
    6 result is 36
    7 result is 42
    8 result is 48
    9 result is 54
    Done waiting...
    

    Main idea and some glue code (failAfter function) come from here.

    Any other suggestions or improvement are welcome.