Search code examples
javamultithreadingjava-threadscompletable-future

How to perform resource cleanup for CompletableFuture in Java?


I've a piece of code in CompletableFuture that performs retry if there are exceptions else completes the task. I've passed a resource to the Supplier and Consumer to perform the task and want to close those resource once all the tasks are completed (success/exception after 3 retries).

Here is the piece of code:

Supplier mySupplier = new MySupplier(localContext);
CompletableFuture<String> future = CompletableFuture.supplyAsync(mySupplier);
for(int j = 0; j < (retryCount - 1); j++) {
    LOGGER.debug("MySupplier accept() Retry count: "+j);
    future = future.handleAsync((value, throwable) -> throwable == null? CompletableFuture.completedFuture(value): CompletableFuture.supplyAsync(mySupplier)).thenComposeAsync(Function.identity());
}

I was planning to put it under the finally block of my supplier but if the first exception occurs the resource would be closed and I need them for the next two retry.

1) How to make it work?

2) Also is there are ways to print the number of retry only in case of exception?


Solution

  • Since you don't seem to care about the intermediate results, the easiest solution is to simply wrap your Supplier in another one that handles retries:

    class SupplierRetrier<T> implements Supplier<T> {
        private static final Logger LOGGER = LoggerFactory.getLogger(SupplierRetrier.class);
        final Supplier<T> wrappee;
        final int maxRetries;
    
        SupplierRetrier(Supplier<T> wrappee, int maxRetries) {
            Objects.requireNonNull(wrappee);
            if (maxRetries <= 0) {
                throw new IllegalArgumentException("maxRetries must be more than 0: " + maxRetries);
            }
            this.wrappee = wrappee;
            this.maxRetries = maxRetries;
        }
    
        @Override
        public T get() {
            RuntimeException lastException = null;
            for (int i = 0; i < maxRetries; i++) {
                try {
                    LOGGER.info("MySupplier accept() Retry count: "+i);
                    return wrappee.get();
                } catch (RuntimeException e) {
                    lastException = e;
                }
            }
            throw lastException;
        }
    }
    

    You can then simply use it with:

    CompletableFuture<String> future = CompletableFuture.supplyAsync(
            new SupplierRetrier<>(mySupplier, retryCount));
    

    In order to clean your context, just add a whenComplete() call on the resulting future. This will be executed whatever the result of the future.

    future.whenComplete((r, e) -> {
        try {
            localContext.close();
        } catch (Exception e2) {
            throw new RuntimeException("Failed to close context", e2);
        }
    });