I am trying to implement a database query with retry strategy on certain database exceptions. The code for retry strategy is not very relevant, so I did not include it. As you can see in the code below - I have written a retryCallable which takes the retry strategy and the Callable in populateData()
.
In getDataFromDB
, I get the data from DB and put the data in a global hashmap which serves as a cache at an application level.
This code is working as expected. I would like to invoke populateData
from a different class. However, this would be a blocking call. Since this is Database and has retry strategy, this could be slow. I want to call populateData
asynchronously.
How can I use CompletableFuture or FutureTask to achieve this?
CompletableFuture.runAsync
expects a runnable. CompletableFuture.supplyAsync
expects a supplier. I have not implemented these things before. So any advice on best practices would be helpful.
Class TestCallableRetry {
public void populateData() {
final Callable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
Set<String> data = new HashSet<>();
data = retryCallable.call();
if (data != null && !data.isEmpty()) {
// store data in a global hash map
}
}
private Callable<Set<Building>> getDataFromDB() {
return new Callable<Set<String>>() {
@Override
public Set<String> call() {
// returns data from database
}
};
}
}
Class InvokeCallableAsynchronously {
public void putDataInGlobalMap {
// call populateData asynchronously
}
}
If you split your populateData
method into two parts, one Supplier
to fetch the data and another Consumer
to store it, it will be easy to chain them with a CompletableFuture
.
// Signature compatible with Supplier<Set<String>>
private Set<String> fetchDataWithRetry() {
final RetryingCallable<Set<String>> retryCallable = new RetryingCallable<>(retryStrategyToRetryOnDBException(), getDataFromDB());
try {
return retryCallable.call();
} catch (Exception e) {
log.error("Call to database failed", e);
return Collections.emptySet();
}
}
// Signature compatible with Consumer<Set<String>>
private void storeData(Set<String> data) {
if (!data.isEmpty()) {
// store data in a global hash map
}
}
Then, in populateData()
:
private ExecutorService executor = Executors.newCachedThreadPool();
public void populateData() {
CompletableFuture
.supplyAsync(this::fetchDataWithRetry, executor)
.thenAccept(this::storeData);
}
The use of the version of supplyAsync
that takes an Executor
is optional. If you use the single arg version your task will run in the common pool; OK for short running tasks but not for tasks that block.