Search code examples
javarunnablecallablecompletable-future

CompletableFuture : Invoke a void function asynchronusly


I am trying to implement a database query with retry strategy on certain database exceptions. The code for retry strategy is not very relevant, so I did not include it. As you can see in the code below - I have written a retryCallable which takes the retry strategy and the Callable in populateData().

In getDataFromDB, I get the data from DB and put the data in a global hashmap which serves as a cache at an application level.

This code is working as expected. I would like to invoke populateData from a different class. However, this would be a blocking call. Since this is Database and has retry strategy, this could be slow. I want to call populateData asynchronously.

How can I use CompletableFuture or FutureTask to achieve this? CompletableFuture.runAsync expects a runnable. CompletableFuture.supplyAsync expects a supplier. I have not implemented these things before. So any advice on best practices would be helpful.

Class TestCallableRetry {

public void populateData() {
        final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
        Set<String> data = new HashSet<>();

        data = retryCallable.call();

        if (data != null && !data.isEmpty()) {
            // store data in a global hash map
        }
    }

    private Callable<Set<Building>> getDataFromDB() {
        return new Callable<Set<String>>() {
            @Override
            public Set<String> call() {
                // returns data from database
            }
        };
    }
}

Class InvokeCallableAsynchronously {
    public void putDataInGlobalMap {
      // call populateData asynchronously
    }
}

Solution

  • If you split your populateData method into two parts, one Supplier to fetch the data and another Consumer to store it, it will be easy to chain them with a CompletableFuture.

    // Signature compatible with Supplier<Set<String>> 
    private Set<String> fetchDataWithRetry() {
        final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
        try {
            return retryCallable.call();
        } catch (Exception e) {
            log.error("Call to database failed", e);
            return Collections.emptySet();
        }
    }
    
    // Signature compatible with Consumer<Set<String>>
    private void storeData(Set<String> data) {
        if (!data.isEmpty()) {
            // store data in a global hash map
        }
    }
    

    Then, in populateData():

    private ExecutorService executor = Executors.newCachedThreadPool();
    
    public void populateData() {
        CompletableFuture
            .supplyAsync(this::fetchDataWithRetry, executor)
            .thenAccept(this::storeData);
    }
    

    The use of the version of supplyAsync that takes an Executor is optional. If you use the single arg version your task will run in the common pool; OK for short running tasks but not for tasks that block.