I am trying to load a database into memory quickly using CompleteableFutures. I start the Spring transaction at the method level:
@Transactional()
private void loadErUp() {
StopWatch sw = StopWatch.createStarted();
List<CompletableFuture<Void>> calls = new ArrayList<>();
final ZonedDateTime zdt = ZonedDateTime.now(ZoneId.of(ZoneOffset.UTC.getId())).minusMinutes(REFRESH_OVERLAP);
for (long i = 1; i < 12 + 1; i++) {
Long holder = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
this.loadPartition(holder, zdt);
}, this.forkJoinPool);
calls.add(future);
}
CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
log.info("All data refreshed in ({}ms) since:{}", sw.getTime(), zdt.format(DateTimeFormatter.ISO_INSTANT));
}
And then attach each thread to the main transaction via
TransactionSynchronizationManager.setActualTransactionActive(true);
private <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
log.debug("Refresh thread start:{}", partitionKey);
TransactionSynchronizationManager.setActualTransactionActive(true);
StopWatch sw = StopWatch.createStarted();
try (Stream<Authority> authorityStream = aSqlRepo.findByPartitionKeyAndLastUpdatedTimeStampAfter(partitionKey, zdt)) {
long count = authorityStream.peek(a -> {
this.authorityRepository.set(this.GSON.fromJson(a.getJsonData(), AssetAuthority.class));
}).count();
log.info("Partition {} refreshed in ({}ms) with {} items.", partitionKey, sw.getTime(), count);
return count;
}
}
So I run this batch job every 30 seconds and in the 9th run I get 4 threads and then it hangs (12*8 runs = 96) because it is waiting for a pool to come open. I get:
Unable to acquire JDBC Connection; Unable to fetch a connection in 30 seconds, none available[size:100; busy:100; idle:0; lastwait:30000].
So obviously the connections are not committing. I thought it might be because I have my own ForkJoinPool, however, I shutdown all those threads and it didn't seem to help. I have also put an other method under the loadPartition() method but that didn't seem to help either. There is another thread that talks about how to get the transactions to work, but mine work, they just don't commit.
If you want to have each #loadPartition
run on it's own thread and in it's own transaction, you'll need to:
#loadPartition
as @Transactional
#loadPartition
method so that the @Transactional
works. You can do this either by self-autowiring or calling the method from another proxied classThe transaction is not getting propagated to the asynchronous threads because (important!) that method is not getting proxied.
So it will look like:
@Component
public class MyLoaderClass {
// Autowire in this with constructor injection or @Autowired
private MyLoaderClass myLoaderClass;
// Removed @Transactional annotation
public void loadErUp() {
myLoaderClass.loadPartition(holder, zdt);
...
}
// 1) Add the @Transactional annotation to #loadPartition
// 2) Make public to use self-autowiring (or refactored class, per link above)
@Transactional
public <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
...
// Can remove TransactionSyncManager call
...
}
}
You'll also want to make sure your batch job doesn't run without making sure the last job completed. You can easily solve this by using the @Scheduled
annotation for your table load to ensure the runs don't "overlap".