Search code examples
hibernatetransactionsquarkusquarkus-panache

Quarkus Panache Transactional how to close db connections properly


I have a problem with transactions in Panache Quarkus when dealing with a high count of transactions in parallel

this is my sample code

public Uni<Void> executeLotsOfTransactions() {
    Multi<Integer> multi = Multi.createFrom().range(0, 1000);

    return multi
            .onItem().transformToUniAndMerge(number -> {
                return Uni.createFrom()
                        .deferred(() -> executeSomeTransaction(number))
                        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
                        .emitOn(MutinyHelper.executor(Vertx.currentContext()));
            })
            .collect().asList()
            .replaceWithVoid();
}

@Transactional
Uni<Void> executeSomeTransaction(Integer num) {
    Uni<Item> uniItem = someRepo.findSomeData(num); // basically find()
    return uniItem
            .chain(item -> {
                // do some calculation & update with item
                return someRepo.updateItem(item); // basically persist() or update() item
            })
            .onFailure().retry().withBackOff(Duration.ofSeconds(5)).atMost(3);
}

note that I use .deferred() and .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) so the JTA starts in worker pool

some of the someRepo.updateItem(item) is successful, but many of them also fail with this Exception:

javax.persistence.PersistenceException: org.hibernate.exception.GenericJDBCException: Unable to acquire JDBC Connection

and this is the repo code:

@Override
public Uni<Void> updateItem(Item item) {
    return Uni.createFrom()
            .voidItem().invoke(() -> updateItemNonUni(item))
            .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
            .emitOn(MutinyHelper.executor(Vertx.currentContext()));
}

@Transactional(REQUIRED)
void updateItemNonUni(Item item) {
    // this approach is not optimal, should rework
    Item itemFromDB = findById(item.getID());
    itemFromDB.setBalance(item.getBalance());
    itemFromDB.setStatus(item.getStatus());
    itemFromDB.setUpdatedDate(LocalDateTime.now());
    persist(itemFromDB);
}

I think at first its because I do the transactions in parallel, so I added retry .onFailure() but this doesn't seem to help because the number of failing operations did not decrease

So I thought it was because the connection from the other executeSomeTransaction is not yet closed even if it is done already.

I thought this has something to do with how Uni works, but I'm not 100% sure where I did wrong here.

so my question is, how can I close the db connections properly?

Edit: I'm using non-reactive Hibernate-ORM and Quarkus version 2.16


Solution

  • Based on @ozkanpakdil's comment, I tried to limit the concurrency of my insert and now it works.

    this is roughly the solution

    public Uni<Void> insertAmounts(List<SomeEntity> itemList) {
        return Multi.createFrom().iterable(ListUtils.partition(itemList, 2000))
                .emitOn(Infrastructure.getDefaultWorkerPool())
                .onItem().transformToUni(batchedList -> Uni.createFrom().voidItem()
                        .invoke(() -> InsertManyItemsNonUni(batchedList))
                )
                .merge(50)
                .collect().asList()
                .replaceWithVoid()
                .emitOn(MutinyHelper.executor(Vertx.currentContext()));
    }
    
    void InsertManyItemsNonUni(List<SomeEntity> itemList) {
        if (QuarkusTransaction.isActive()) {
            QuarkusTransaction.joiningExisting();
        } else {
            QuarkusTransaction.begin();
        }
        repositoryWriter.persist(itemList);
        QuarkusTransaction.commit();
    }
    

    note: my max connection pool is 100