I'm create a ForkJoinPool with a number of thread to execute a stream parallel, that is executed from a query in jpa, but I'm have trouble with the Transactional propagate to the method submit of ForkJoinPool.
@Transactional(readOnly = true)
public void streamTest() {
ForkJoinPool customThreadPool = new ForkJoinPool(20);
try {
customThreadPool.submit(() ->
priceRepository.streamAll()
.parallel()
.map(p -> this.transform(p))
.forEach(System.out::println)
).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
I'm getting the error: "You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction."
If I take off the ForkJoinPool to execute the stream, it's works fine. How can I propagate the transaction(readOnly) to the execution of the method submit from ForkJoinPool, is there a way?
I discovered how to set transactional inside the task of ForkJoinPool. I only have to use the TransactionSynchronizationManager as I did below.
@Transactional(readOnly = true)
public void streamTest() {
ForkJoinPool customThreadPool = new ForkJoinPool(20);
try {
customThreadPool.submit(() -> {
TransactionSynchronizationManager.setActualTransactionActive(true);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(true);
TransactionSynchronizationManager.initSynchronization();
priceRepository.streamAll()
.parallel()
.map(p -> this.transform(p))
.forEach(System.out::println);
}).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}