I am querying a database for an item using R2DBC and Spring Integration. I want to extend the transaction boundary a bit to include a handler - if the handler fails I want to roll back the database operation. But I'm having difficulty even establishing transactionality explicitly in my integration flow. The flow is defined as
@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1")
.expectSingleResult(true)
.payloadType(Event.class),
e - > e.poller(Pollers.cron("30/2 * * * * *")
.transactional(transactionManager)))
.channel(MessageChannels.flux())
.handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
.channel(MessageChannels.queue("queue"))
.get();
}
where the transaction manager is obtained in this way:
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
When trying this, I get an exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented:
org.springframework.messaging.MessagingException:
nested exception is java.lang.IllegalStateException:
Cannot apply reactive transaction to non-reactive return type: class java.lang.Object
Looking at the Spring Integration code reveals the issue.
A TransactionInterceptor is added as advice to the polling task created in AbstractPollingEndpoint#createPollingTask
. That task is a Callable<Message<?>>
.
Soon, the poller (a reactive implementation) triggers and AbstractPollingEndpoint#pollForMessage
is invoked - causing the advice TransactionInterceptor#invoke
to be called. It calls TransactionAspectSupport#invokeWithinTransaction
, which determines that the transaction manager is a reactive one, and that the polling task does not return a reactive type (it's the Object returned from the Callable) - and throws the exception.
The failure occurs before handing off to the channel or anything after it.
It makes sense, since the R2DBC endpoint returns a Message<Flux<Event>>
.
So, I'm left wondering how to access the transaction semantics for a reactive flow -
In other words, I'm expecting that the transaction boundaries would start with the poll and end on its return - and that the transaction applies for the lifetime of the reactive context (subscription). The actual application does leverage FluxChannels and the handler is a consumer of one of those channels.
I realize that I want to keep those boundaries slim, but in the use case I have, I need to perform a locking operation on a table row, perform an external task and release the lock within the scope of a transaction.
The options I have seem to be :
@Transactional
@Bean
IntegrationFlow flow(...
Try something like
.transactional(new TransactionInterceptorBuilder().build())
which will create a transaction if needed - but I'm not sure it will be aware of the reactive transaction manager and its state...
Use a compensating action - recalling the former state of the data in the row
Use my own handler to encapsulate the database operation and other actions to wrap all of the operations together.
Manage the transaction myself
Configure the flow differently based on help from this forum.
Is there a way to extend the transactional boundaries in defining the flow? Or what would be an effective way to accomplish the task?
Thank you very much in advance.
This complete test illustrates the problem:
@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTransactionalTest {
@Autowired
DatabaseClient client;
R2dbcEntityTemplate entityTemplate;
@Qualifier("queue")
@Autowired
QueueChannel validationChannel;
@BeforeEach
public void setup() {
this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);
List < String > statements =
Arrays.asList(
"DROP TABLE IF EXISTS events;",
"CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");
statements.forEach(it - > this.client.sql(it)
.fetch()
.rowsUpdated()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete());
}
@Test
public void validateSuccessfulIntegrationFlow() {
this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
.then()
.as(StepVerifier::create)
.expectComplete()
.verify(Duration.ofSeconds(1));
// Validate string
final Message << ? > message = validationChannel.receive();
assertThat(message.getPayload()).isEqualTo("Event details");
assertThat(message.getHeaders()).containsKey("foo");
}
@Import(R2dbcDatabaseConfiguration.class)
@Configuration
@EnableIntegration
static class SpringIntegrationConfiguration {
@Autowired
R2dbcEntityTemplate r2dbcEntityTemplate;
@Autowired
ReactiveTransactionManager transactionManager;
@Bean
IntegrationFlow flow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlows
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1")
.expectSingleResult(true)
.payloadType(Event.class),
e - > e.poller(Pollers.cron("30/2 * * * * *")
.transactional(transactionManager)))
.channel(MessageChannels.flux())
.handle(Mono.class, (payload, headers) - > doSomethingUsingSameTransaction(payload), e - > e.async(true))
.channel(MessageChannels.queue("queue"))
.get();
}
public Mono < Message < String >> doSomethingUsingSameTransaction(Mono < Event > eventMono) {
return eventMono.map(event - >
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
}
@Configuration
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return createConnectionFactory();
}
public ConnectionFactory createConnectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory("r2dbc")
.username("sa")
.password("")
.option("DB_CLOSE_DELAY=-1").build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
}
@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new R2dbcTransactionManager(connectionFactory);
}
}
@Table("events")
@Getter
@Setter
@RequiredArgsConstructor
static class Event {
@Id
private Integer id;
@NonNull
public Instant timestamp;
@NonNull
public String details;
}
}
Well, it's indeed not possible that declarative way since we don't have hook for injecting to the reactive type in the middle on that level.
Try to look into a TransactionalOperator
and its usage from the Java DSL's fluxTransform()
:
/**
* Populate a {@link FluxMessageChannel} to start a reactive processing for upstream data,
* wrap it to a {@link Flux}, apply provided {@link Function} via {@link Flux#transform(Function)}
* and emit the result to one more {@link FluxMessageChannel}, subscribed in the downstream flow.
* @param fluxFunction the {@link Function} to process data reactive manner.
* @param <I> the input payload type.
* @param <O> the output type.
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
@SuppressWarnings(UNCHECKED)
public <I, O> B fluxTransform(Function<? super Flux<Message<I>>, ? extends Publisher<O>> fluxFunction) {
See also Spring Framework docs: https://docs.spring.io/spring-framework/docs/current/reference/html/data-access.html#tx-prog-operator.
I will think about declarative reactive transactions on the level you mention in your question...