Search code examples
javaspringspring-integrationspring-integration-dsl

How to handle transactions for Spring integration flows (Java DSL)


How is it possible to define a transaction for a complete flow in spring integration (Java DSL)?

With Spring integration we can define an example flow with:

@Bean
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
    return IntegrationFlows.from(myInboundChannel)
            .transform(aMessageTransformer)
            .transform(anotherMessageTransformer)
            .channel(anOutputChannel)
            .get();
}

I need a transaction which overspans the complete flow. Currently, when I access a database with 'aMessageTransformer', the transaction will be closed after this message transformer has been processed. But I need a transaction which is still uncommitted when processing 'anotherMessageTransformer'?

I expected that I just have to add a '@Transactional' (or @Transactional(propagation = Propagation.REQUIRED, readOnly = true))

@Bean
@Transactional
public IntegrationFlow myMessageFromMessageAmqpInboundFlow() {
    return IntegrationFlows.from(myInboundChannel)
            .transform(aMessageTransformer)
            .transform(anotherMessageTransformer)
            .channel(anOutputChannel)
            .get();
}

but this leads to a 'no session exception' in 'anotherMessageTransformer'


Solution

  • You need to follow this documentation and therefore add this to your flow:

    .transform(aMessageTransformer, e -> e.transactional(true))
    

    where that .transactional() is about:

    /**
     * Specify a {@link TransactionInterceptor} {@link Advice} with default
     * {@code PlatformTransactionManager} and {@link DefaultTransactionAttribute} for the
     * {@link MessageHandler}.
     * @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
     * {@code false} - regular {@link TransactionInterceptor}; {@code true} -
     * {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
     * extension.
     * @return the spec.
     */
    public S transactional(boolean handleMessageAdvice) {
    

    The TransactionHandleMessageAdvice means:

    * When this {@link Advice} is used from the {@code request-handler-advice-chain}, it is applied
     * to the {@link MessageHandler#handleMessage}
     * (not to the
     * {@link org.springframework.integration.handler.AbstractReplyProducingMessageHandler.RequestHandler#handleRequestMessage}),
     * therefore the entire downstream process is wrapped to the transaction.