I am using R2dbcMessageSource
to query a table for an item and want to use one of the columns to create a message to send to a message handler.
The result of the query is a Message<Mono<Event>>
, which makes perfect sense. I want to take event.getDetails
and create a Message.
Using a DirectChannel
and Transformer
, I tried something like this
@Bean
@Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
public GenericTransformer<Message<Mono<Event>>, Message<String>> monoToString() {
return message -> {
Mono<Event> payload = message.getPayload();
final String details = payload.block();
if (details == null) {
return null;
}
return MessageBuilder
.withPayload(details)
.setHeader("foo", "baz")
.build();
};
}
Of course, I would need to put this on its own executor to avoid tying up the thread, but it doesn't work anyway - it throws a class cast exception.
error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1651b34e]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoOnAssembly cannot be cast to org.springframework.messaging.Message, failedMessage=GenericMessage [payload=Mono.flatMap
This seems to be because the Transformer
is evaluated on assembly.
Using a FluxMessageChannel
seems strange, since the source is not reactive itself, but it allows me to use a service activator transform the payload.
@Bean
@ServiceActivator(inputChannel = "fromR2dbcChannel", reactive = @Reactive("publishMessageWithString"))
public MessageHandler toValidator() {
return message -> validationChannel().send(message);
}
@Bean
public Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> publishMessageWithString() {
return flux -> flux
.as(toMessageWithString());
}
private Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> toMessageWithString() {
return messageFlux -> messageFlux.map(message -> {
final Mono<Event> payload = message.getPayload();
if (payload != null) {
final Event event = payload.block();
if (event != null) {
return MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build();
}
}
return null;
});
}
In real life, the message handler attached to the service activator does not understand flux natively. It's a KinesisMessageHandler
.
While this seems to work, it feels awkward. Is there a better way to transform that's reactor friendly? Or an adapter that subscribes and invokes a handler? It seems very reasonable to pass Message<Mono<Event>>
end to end, so the latter may not be appropriate. Thanks!
UPDATE
Thanks to Artem, a working test follows:
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.dialect.H2Dialect;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.r2dbc.inbound.R2dbcMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTest {
@Autowired
DatabaseClient client;
R2dbcEntityTemplate entityTemplate;
@Autowired
QueueChannel validationChannel;
@Autowired
FluxMessageChannel fluxChannel;
@BeforeEach
public void setup() {
this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);
List<String> statements =
Arrays.asList(
"DROP TABLE IF EXISTS events;",
"CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");
statements.forEach(it -> this.client.sql(it)
.fetch()
.rowsUpdated()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete());
}
@Test
public void validateSuccessfulIntegrationFlow() throws InterruptedException {
this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
.then()
.as(StepVerifier::create)
.verifyComplete();
// Validate string
final Message<?> message = validationChannel.receive();
assertThat(message.getPayload()).isEqualTo("Event details");
assertThat(message.getHeaders()).containsKey("foo");
}
@Import(R2dbcDatabaseConfiguration.class)
@Configuration
@EnableIntegration
static class SpringIntegrationConfiguration {
@Autowired
R2dbcEntityTemplate r2dbcEntityTemplate;
@Bean
FluxMessageChannel fromR2dbcChannel() {
return new FluxMessageChannel();
}
@BridgeTo(value = "validationChannel")
@Bean
FluxMessageChannel fluxChannel() {
return new FluxMessageChannel();
}
@Bean
QueueChannel validationChannel() {
return new QueueChannel();
}
@ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "fluxChannel", async = "true")
public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
return eventMono.map(event ->
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
// Cron expression is only here because Spring environment is fully initialized before test
// creates table, so wait for the test to start.
@Bean
@InboundChannelAdapter(value = "fromR2dbcChannel", poller = @Poller(cron = "30/2 * * * * *"))
public R2dbcMessageSource r2dbcMessageSourceSelectOne(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(r2dbcEntityTemplate,
"SELECT * FROM events LIMIT 1");
r2dbcMessageSource.setPayloadType(Event.class);
r2dbcMessageSource.setExpectSingleResult(true);
return r2dbcMessageSource;
}
}
@Configuration
@EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {
@Bean
@Override
public ConnectionFactory connectionFactory() {
return createConnectionFactory();
}
public ConnectionFactory createConnectionFactory() {
return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
.inMemory("r2dbc")
.username("sa")
.password("")
.option("DB_CLOSE_DELAY=-1").build());
}
@Bean
public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
return DatabaseClient.create(connectionFactory);
}
@Bean
public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
}
}
@Table("events")
@Getter
@Setter
@RequiredArgsConstructor
static class Event {
@Id
private Integer id;
@NonNull
public Instant timestamp;
@NonNull
public String details;
}
}
I would advice to re-think your vision about reactive stream and start avoiding calling that .block()
manually.
Looking to your whole flow requirements, it is really better to make that fromR2dbcChannel
as a FluxMessageChannel
, so your Mono
from R2DBC is going to be subscribed and processed smoothly internally by the framework if there is data.
Your @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
then could just deal with a plain Event
as an input parameter. Then your KinesisMessageHandler
is good to deal with whatever you send to its input channel in the palyoad from that event.getDetails()
.
UPDATE
So, my bad. Independently of the channel for @InboundChannelAdapter
, it still going to produce a Mono
in the payload of the message. From here the channel type really doesn't matter. |But at the same time you can make that validationChannel
as a FluxMessageChannel
and then your transformer must be changed to the service activator:
@ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "validationChannel", async = "true")
public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
return eventMono.map(
MessageBuilder
.withPayload(event.getDetails())
.setHeader("foo", "baz")
.build());
}
This way the result Mono
is going to be subscribed by the FluxMessageChannel
and the rest of your "blocking" flow should remain the same.
The problem with transformer that it does return the Message
if result of POJO method is not a Message
.