Search code examples
spring-integration

Spring integration - how to initiate a database transaction on the flow?


I have an integration flow that sinks DML queries at the end.

I have a config file to log transactions:

logging.level.org.springframework.transaction.interceptor=TRACE
logging.level.org.springframework.transaction.support=DEBUG

The sinkSql method is called but there is no transaction log.

If I just call e.transactional(true) I get an error because there are two transaction managers (one is from the source database).

@Bean(name = SYBASE_TRAN_MANAGER)
public PlatformTransactionManager transactionManager(@Qualifier(SYBASE_DS) final DataSource sybaseDataSource) {
    DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
    dataSourceTransactionManager.setDataSource(sybaseDataSource);
    return dataSourceTransactionManager;
}

@Bean
public TransactionInterceptor sybaseTransactionInterceptor(@Qualifier(SYBASE_TRAN_MANAGER) final TransactionManager tm) {
    return new TransactionInterceptorBuilder(true)
            .transactionManager(tm)
            .isolation(Isolation.READ_COMMITTED)
            .propagation(Propagation.REQUIRES_NEW)
            .readOnly(false)
            .build();
}

@Bean
public IntegrationFlow sinkSqlFlow(final TransactionInterceptor sybaseTransactionInterceptor) {
    return IntegrationFlows.from(SYBASE_SINK_SQL)
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, SYBASE_ERROR))
            .handle(this::sinkSql, e -> e.transactional(sybaseTransactionInterceptor))
            .get();
}

public void sinkSql(final Message<?> message) {
    //jdbcTemplate logic here
}

Solution

  • Not sure why is the question since TransactionAspectSupport does just this plain fallback if we don't provide an explicit TransactionManager to the interceptor configuration:

    if (defaultTransactionManager == null) {
                    defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
                    this.transactionManagerCache.putIfAbsent(
                            DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
                }
    

    where that getBean(Class aClass) indeed is going to fail if several beans of type is present in the application context. So, what you did so far with a sybaseTransactionInterceptor bean definition is OK.

    You can use an overloaded Java DSL method though:

    /**
     * Specify a {@link TransactionInterceptor} {@link Advice} with the provided
     * {@code PlatformTransactionManager} and default
     * {@link org.springframework.transaction.interceptor.DefaultTransactionAttribute}
     * for the {@link MessageHandler}.
     * @param transactionManager the {@link TransactionManager} to use.
     * @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
     * {@code false} - regular {@link TransactionInterceptor}; {@code true} -
     * {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
     * extension.
     * @return the spec.
     */
    public S transactional(TransactionManager transactionManager, boolean handleMessageAdvice) {
    

    Although having your sink contract as void sinkSql(final Message<?> message), there is no need in that true: the transaction is going to be applied for a handleMessage() method which is really the end of your flow anyway.