Search code examples
spring-bootspring-integrationspring-integration-dsl

How do I use a Transaction in a Reactive Flow in Spring Integration?


I am querying a database for an item using R2DBC and Spring Integration. I want to extend the transaction boundary a bit to include a handler - if the handler fails I want to roll back the database operation. But I'm having difficulty even establishing transactionality explicitly in my integration flow. The flow is defined as

@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlows
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
                "SELECT * FROM events LIMIT 1")
            .expectSingleResult(true)
            .payloadType(Event.class),
            e - > e.poller(Pollers.cron("30/2 * * * * *")
                .transactional(transactionManager)))
        .channel(MessageChannels.flux())
        .handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
        .channel(MessageChannels.queue("queue"))
        .get();
}

where the transaction manager is obtained in this way:

@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new R2dbcTransactionManager(connectionFactory);
}

When trying this, I get an exception:

reactor.core.Exceptions$ErrorCallbackNotImplemented: 
org.springframework.messaging.MessagingException: 
    nested exception is java.lang.IllegalStateException: 
    Cannot apply reactive transaction to non-reactive return type: class java.lang.Object

Looking at the Spring Integration code reveals the issue.

A TransactionInterceptor is added as advice to the polling task created in AbstractPollingEndpoint#createPollingTask. That task is a Callable<Message<?>>.

Soon, the poller (a reactive implementation) triggers and AbstractPollingEndpoint#pollForMessage is invoked - causing the advice TransactionInterceptor#invoke to be called. It calls TransactionAspectSupport#invokeWithinTransaction, which determines that the transaction manager is a reactive one, and that the polling task does not return a reactive type (it's the Object returned from the Callable) - and throws the exception.

The failure occurs before handing off to the channel or anything after it.

It makes sense, since the R2DBC endpoint returns a Message<Flux<Event>>.

So, I'm left wondering how to access the transaction semantics for a reactive flow -

In other words, I'm expecting that the transaction boundaries would start with the poll and end on its return - and that the transaction applies for the lifetime of the reactive context (subscription). The actual application does leverage FluxChannels and the handler is a consumer of one of those channels.

I realize that I want to keep those boundaries slim, but in the use case I have, I need to perform a locking operation on a table row, perform an external task and release the lock within the scope of a transaction.

The options I have seem to be :

  1. Start the Transaction at the start of the flow - this likely suffers from the same problem I am having now, since there's no indication this is a reactive flow:
    @Transactional
    @Bean
    IntegrationFlow flow(...
  1. Try something like .transactional(new TransactionInterceptorBuilder().build())

    which will create a transaction if needed - but I'm not sure it will be aware of the reactive transaction manager and its state...

  2. Use a compensating action - recalling the former state of the data in the row

  3. Use my own handler to encapsulate the database operation and other actions to wrap all of the operations together.

  4. Manage the transaction myself

  5. Configure the flow differently based on help from this forum.

Is there a way to extend the transactional boundaries in defining the flow? Or what would be an effective way to accomplish the task?

Thank you very much in advance.

This complete test illustrates the problem:

@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTransactionalTest {

    @Autowired
    DatabaseClient client;

    R2dbcEntityTemplate entityTemplate;

    @Qualifier("queue")
    @Autowired
    QueueChannel validationChannel;

    @BeforeEach
    public void setup() {
        this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);

        List < String > statements =
            Arrays.asList(
                "DROP TABLE IF EXISTS events;",
                "CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");

        statements.forEach(it - > this.client.sql(it)
            .fetch()
            .rowsUpdated()
            .as(StepVerifier::create)
            .expectNextCount(1)
            .verifyComplete());
    }

    @Test
    public void validateSuccessfulIntegrationFlow() {
        this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
            .then()
            .as(StepVerifier::create)
            .expectComplete()
            .verify(Duration.ofSeconds(1));

        // Validate string
        final Message << ? > message = validationChannel.receive();
        assertThat(message.getPayload()).isEqualTo("Event details");
        assertThat(message.getHeaders()).containsKey("foo");
    }

    @Import(R2dbcDatabaseConfiguration.class)
    @Configuration
    @EnableIntegration
    static class SpringIntegrationConfiguration {

        @Autowired
        R2dbcEntityTemplate r2dbcEntityTemplate;

        @Autowired
        ReactiveTransactionManager transactionManager;

        @Bean
        IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
            return IntegrationFlows
                .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
                        "SELECT * FROM events LIMIT 1")
                    .expectSingleResult(true)
                    .payloadType(Event.class),
                    e - > e.poller(Pollers.cron("30/2 * * * * *")
                        .transactional(transactionManager)))
                .channel(MessageChannels.flux())
                .handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
                .channel(MessageChannels.queue("queue"))
                .get();
        }

        public Mono < Message < String >> doSomethingUsingSameTransaction(Mono < Event > eventMono) {
            return eventMono.map(event - >
                MessageBuilder
                .withPayload(event.getDetails())
                .setHeader("foo", "baz")
                .build());
        }
    }

    @Configuration
    @EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
    static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {

        @Bean
        @Override
        public ConnectionFactory connectionFactory() {
            return createConnectionFactory();
        }

        public ConnectionFactory createConnectionFactory() {
            return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
                .inMemory("r2dbc")
                .username("sa")
                .password("")
                .option("DB_CLOSE_DELAY=-1").build());
        }

        @Bean
        public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
            return DatabaseClient.create(connectionFactory);
        }

        @Bean
        public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
            return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
        }

        @Bean
        ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
            return new R2dbcTransactionManager(connectionFactory);
        }

    }

    @Table("events")
    @Getter
    @Setter
    @RequiredArgsConstructor
    static class Event {
        @Id
        private Integer id;

        @NonNull
        public Instant timestamp;
        @NonNull
        public String details;

    }

}

Solution

  • Well, it's indeed not possible that declarative way since we don't have hook for injecting to the reactive type in the middle on that level.

    Try to look into a TransactionalOperator and its usage from the Java DSL's fluxTransform():

    /**
     * Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
     * wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
     * and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
     * @param fluxFunction the {@link Function} to process data reactive manner.
     * @param <I> the input payload type.
     * @param <O> the output type.
     * @return the current {@link BaseIntegrationFlowDefinition}.
     */
    @SuppressWarnings(UNCHECKED)
    public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {
    

    See also Spring Framework docs: https://docs.spring.io/spring-framework/docs/current/reference/html/data-access.html#tx-prog-operator.

    I will think about declarative reactive transactions on the level you mention in your question...