Search code examples
javapostgresqlspring-integration

Spring Integration multiple transactional pollers with separate thread pools


I am using Spring Integration to implement a project for processing messages/events with the following requirements: an inbound message that comes in to the system should never be lost, even in the case of application crashes. When the processing either finishes successfully or fails, it should be logged for auditing purposes. These logs should also never be lost, even in the case of application crashes. It should also be reasonably performant.

I set up my project in the following way - I am using three Queue channels backed by a PostgreSQL message store. The main queue is used to store incoming messages, and its transactional poller will start the message flow. Transient errors are handled and retried inside the flow itself, using RetryAdvice, so if at any point the flow throws an exception, the transaction should not be rolled back, but instead the message should go to the error channel Queue which is also persistent. The error channel and success channel Queue also have transactional pollers setup in the same way. Also to make it more performant, every transactional poller was set up with a fixed thread pool.

The problem with this setup is when the application runs the database connection timeouts:

Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30001ms.
    at ...

Pollers are configured like this:

Pollers.fixedDelay(50).maxMessagesPerPoll(-1)
            .advice(
                TransactionInterceptorBuilder()
                    .transactionManager(transactionManager)
                    .isolation(Isolation.READ_COMMITTED)
                    .propagation(Propagation.REQUIRED)
                    .transactionAttribute(object : DefaultTransactionAttribute() {
                        override fun rollbackOn(ex: Throwable): Boolean {
                            return false
                        }
                    })
                    .build()
            )
            .taskExecutor(Executors.newFixedThreadPool(10))

I tried playing around with fixedRate and maxMessagesPerPoll but this doesn't seem to fix the issue. When i remove the task executor the connection problem disappears but the performance is not great to say the least. I suspect the performance issue is also related to the SELECT FOR UPDATE query the poller is using to fetch new messages that blocks other threads until the flow is complete. Would it be feasible to configure the poller to use something like SKIP LOCKED as a workaround?


Solution

  • The fixedDelay(50) is too fast to interact with DB. Since you have already maxMessagesPerPoll(-1), there is no reason to look into DB so often. With that fast pace you really exhaust your connection pool, therefore the error you get.

    With the maxMessagesPerPoll(-1) (which is default for PollingConsumer), all the records are fetched with one connection.

    If SKIP LOCKED works for you, then indeed it is better to use it, since all those records in process are not going to be fetched and no transaction will wait for their FOR UPDATE to be unlocked.

    You may also rethink the architecture to not use so many QeueueChannel against the same DB. You may have only one of them and process data in one transaction and same thread. The maxMessagesPerPoll might be as the size of connection pool.

    The TaskExecutor is used for a single polling cycle, all the maxMessagesPerPoll are loaded in one thread, but since you use transactions, it might be the fact that a new connection is for each message from a pool.

    UPDATE

    I think sometime it is better to talk via code instead of trying to explain things using words. So, the whole logic about poller is here:

    private Runnable createPoller() {
        return () ->
                this.taskExecutor.execute(() -> {
                    int count = 0;
                    while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                        if (this.maxMessagesPerPoll == 0) {
                            logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                            break;
                        }
                        if (pollForMessage() == null) {
                            break;
                        }
                        count++;
                    }
                });
    }
    

    This Runnable is scheduled like this: taskScheduler.schedule(createPoller(), this.trigger);. So, every single time the fixedDelay triggers this method we got:

    1. A task is executed on the other tread.
    2. That task performs a loop until a polled message is null or we reach maxMessagesPerPoll.

    The transaction you mentioned before is added around doPoll() which calls receiveMessage() and then handleMessage(message). All that happens within a thread from that executor. And that thread is held until we reach maxMessagesPerPoll.

    Typically we use a taskExecutor om a poller to free TaskScheduler for other work. But looks like you have a lot for the same Postgres DB, so that's where you fail: you just exhaust a connection pool with so many queries.