Search code examples
spring-integrationspring-integration-dsl

How to restrict transaction boundaries for JdbcPollingChannelAdapter


I have a JdbcPollingChannelAdapter defined as following:

@Bean
public MessageSource<Object> jdbcMessageSource(DataSource dataSource) {
    JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(dataSource,
            "SELECT * FROM common_task where due_at <= NOW() and retries < order by due_at ASC FOR UPDATE SKIP LOCKED");
    jdbcPollingChannelAdapter.setMaxRowsPerPoll(1);
    jdbcPollingChannelAdapter.setUpdateSql("Update common_task set retries = :retries, due_at = due_at + interval '10 minutes' WHERE ID = (:id)");
    jdbcPollingChannelAdapter.setUpdatePerRow(true);
    jdbcPollingChannelAdapter.setRowMapper(this::mapRow);
    jdbcPollingChannelAdapter.setUpdateSqlParameterSourceFactory(this::updateParamSource);
    return jdbcPollingChannelAdapter;
}

The integration flow for this:

@Bean
public IntegrationFlow pollingFlow(MessageSource<Object> jdbcMessageSource) {
    return IntegrationFlows.from(jdbcMessageSource,
            c -> c.poller(Pollers.fixedRate(250, TimeUnit.MILLISECONDS)
                    .maxMessagesPerPoll(1)
                    .transactional()))
            .split()
            .channel(taskSourceChannel())
            .get();
}

The service activator is defined as

@ServiceActivator(inputChannel = "taskSourceChannel")
    public void doSomething(FooTask event) {
        //do something but ** not ** within the transaction of the poller.
    }      

The poller in integration flow is defined as transactional. Based on my understanding, this will 1. Execute select query and update query in transaction. 2. It will also execute doSomething() method in the same transaction.

Goal: I would like to do 1 and not 2. I would like to do select and update in a transaction to make sure both happens. But, I don;t want to execute doSomething() in the same transaction. In case of an exeception in doSomething(), I still want to persist the updates made during polling. How can I acheive this ?


Solution

  • This is done via simple thread shifting. So, what you need is just leave the polling thread, allow it to commit TX and continue process in the separate thread.

    According your logic with the .split(), it's even better to have new thread processing already after splitting, so items are going to be even processed by that doSomething() in parallel.

    The goal simply can be achieved with an ExecutorChannel. Since you already have that taskSourceChannel(), just consider to replace it with ExecutorChannel based on some managed ThreadPoolTaskExecutor.

    See more info in the Reference Manual: https://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html#channel-configuration-executorchannel

    And its Javadocs.

    The simple Java Configuration variant is like this:

        @Bean
        public MessageChannel taskSourceChannel() {
            return new ExecutorChannel(executor());
        }
    
        @Bean
        public Executor executor() {
            return new ThreadPoolTaskExecutor();
        }