Search code examples
javaspring-boottransactionsspring-transactionsforkjoinpool

Propagate transaction to Forkjoin submit


I'm create a ForkJoinPool with a number of thread to execute a stream parallel, that is executed from a query in jpa, but I'm have trouble with the Transactional propagate to the method submit of ForkJoinPool.

@Transactional(readOnly = true)
public void streamTest() {
    ForkJoinPool customThreadPool = new ForkJoinPool(20);
    try {
    customThreadPool.submit(() ->
         priceRepository.streamAll()
         .parallel()
         .map(p -> this.transform(p))
         .forEach(System.out::println)
         ).get();
    } catch (InterruptedException | ExecutionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

I'm getting the error: "You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction."

If I take off the ForkJoinPool to execute the stream, it's works fine. How can I propagate the transaction(readOnly) to the execution of the method submit from ForkJoinPool, is there a way?


Solution

  • I discovered how to set transactional inside the task of ForkJoinPool. I only have to use the TransactionSynchronizationManager as I did below.

    @Transactional(readOnly = true)
    public void streamTest() {
    ForkJoinPool customThreadPool = new ForkJoinPool(20);
    try {
    customThreadPool.submit(() -> {
        TransactionSynchronizationManager.setActualTransactionActive(true);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(true);
        TransactionSynchronizationManager.initSynchronization();
         priceRepository.streamAll()
         .parallel()
         .map(p -> this.transform(p))
         .forEach(System.out::println);
         }).get();
    } catch (InterruptedException | ExecutionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    

    }