Search code examples
javaspring-boottransactionsspring-transactions

Running out of pools using CompleteableFuture and Spring Transaction


I am trying to load a database into memory quickly using CompleteableFutures. I start the Spring transaction at the method level:

@Transactional()
    private void loadErUp() {
        StopWatch sw = StopWatch.createStarted();
        List<CompletableFuture<Void>> calls = new ArrayList<>();
        final ZonedDateTime zdt = ZonedDateTime.now(ZoneId.of(ZoneOffset.UTC.getId())).minusMinutes(REFRESH_OVERLAP);

        for (long i = 1; i < 12 + 1; i++) {
            Long holder = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                this.loadPartition(holder, zdt);
            }, this.forkJoinPool);
            calls.add(future);
        }
        CompletableFuture.allOf(calls.toArray(new CompletableFuture[0])).join();
        log.info("All data refreshed in ({}ms) since:{}", sw.getTime(), zdt.format(DateTimeFormatter.ISO_INSTANT));
    }

And then attach each thread to the main transaction via

TransactionSynchronizationManager.setActualTransactionActive(true);

private <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
        log.debug("Refresh thread start:{}", partitionKey);
        TransactionSynchronizationManager.setActualTransactionActive(true);
        StopWatch sw = StopWatch.createStarted();

        try (Stream<Authority> authorityStream = aSqlRepo.findByPartitionKeyAndLastUpdatedTimeStampAfter(partitionKey, zdt)) {
            long count = authorityStream.peek(a -> {
                this.authorityRepository.set(this.GSON.fromJson(a.getJsonData(), AssetAuthority.class));
            }).count();
            log.info("Partition {} refreshed in ({}ms) with {} items.", partitionKey, sw.getTime(), count);
            return count;
        }
    }

So I run this batch job every 30 seconds and in the 9th run I get 4 threads and then it hangs (12*8 runs = 96) because it is waiting for a pool to come open. I get:

Unable to acquire JDBC Connection; Unable to fetch a connection in 30 seconds, none available[size:100; busy:100; idle:0; lastwait:30000].

So obviously the connections are not committing. I thought it might be because I have my own ForkJoinPool, however, I shutdown all those threads and it didn't seem to help. I have also put an other method under the loadPartition() method but that didn't seem to help either. There is another thread that talks about how to get the transactions to work, but mine work, they just don't commit.


Solution

  • If you want to have each #loadPartition run on it's own thread and in it's own transaction, you'll need to:

    1. Mark #loadPartition as @Transactional
    2. Invoke the proxied #loadPartition method so that the @Transactional works. You can do this either by self-autowiring or calling the method from another proxied class

    The transaction is not getting propagated to the asynchronous threads because (important!) that method is not getting proxied.

    So it will look like:

    @Component
    public class MyLoaderClass {
    
        // Autowire in this with constructor injection or @Autowired
        private MyLoaderClass myLoaderClass;
    
        // Removed @Transactional annotation
        public void loadErUp() {
            myLoaderClass.loadPartition(holder, zdt);
            ...
        }
    
        // 1) Add the @Transactional annotation to #loadPartition
        // 2) Make public to use self-autowiring (or refactored class, per link above)
        @Transactional
        public <T> long loadPartition(long partitionKey, ZonedDateTime zdt) {
            ...
            // Can remove TransactionSyncManager call
            ...
        }
    
    }
    

    You'll also want to make sure your batch job doesn't run without making sure the last job completed. You can easily solve this by using the @Scheduled annotation for your table load to ensure the runs don't "overlap".