Search code examples
javaspring-integrationspring-integration-http

Spring Integration send messages to Executor in transaction


I have a huge number of messages coming from CSV files, that then get sent to a rate limited API. I'm using a Queue Channel backed by a database channel message store to make the messages durable while processing. I want to get as close to the rate limit as possible, so I need to be sending messages to the API across multiple threads.

What I had in my head of how it should work is something reads the DB, sees what messages are available, and then delegates each message to one of the threads to be processed in a transaction.

But I haven't been able to do that, what I've had to do is have a transactional poller which has a thread pool of N threads, a fixed rate of say 5 seconds, and a max messages per poll of 10 (something more than what could be processed in 5 seconds) ... which works ok, but has problems when there are not many messages waiting (i.e. if there were 10 messages they would be processed by a single thread) this isn't going to be a problem in practice because we will have 1000's of messages. But it seems conceptually more complex than how I thought it should work.

I might not have explained this very well, but it seems like what might be a common problem when messages come in fast, but go out slower?


Solution

  • Your solution is really correct, but you need to think do not shift messages into an Exectuor since that way you you jump out of the transaction boundaries.

    The fact that you have 10 messages processed in the same thread is exactly an implementation details and it looks like this:

    AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
                int count = 0;
                while (AbstractPollingEndpoint.this.initialized
                        && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
                        || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
                    try {
                        if (!Poller.this.pollingTask.call()) {
                            break;
                        }
                        count++;
                    }
    

    So, we poll messages until maxMessagesPerPoll in the same thread.

    To make it really more parallel and still keep transaction do not lose messages you need to consider to use fixedRate:

    /**
     * Specify whether the periodic interval should be measured between the
     * scheduled start times rather than between actual completion times.
     * The latter, "fixed delay" behavior, is the default.
     */
    public void setFixedRate(boolean fixedRate)
    

    And increase an amount of thread used by the TaskScheduler for the polling. You can do that declaring a ThreadPoolTaskScheduler bean with the name as IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME to override a default one with the pool as 10. Or use Global Properties to just override the pool size in that default TaskScheduler: https://docs.spring.io/spring-integration/docs/5.0.6.RELEASE/reference/html/configuration.html#global-properties