Search code examples
spring-integration

How do I transform the result of R2dbcMessageSource into a new Message derived from the query result?


I am using R2dbcMessageSource to query a table for an item and want to use one of the columns to create a message to send to a message handler.

The result of the query is a Message<Mono<Event>>, which makes perfect sense. I want to take event.getDetails and create a Message.

Using a DirectChannel and Transformer, I tried something like this

  @Bean
  @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer")
  public GenericTransformer<Message<Mono<Event>>, Message<String>> monoToString() {
    return message -> {
      Mono<Event> payload = message.getPayload();
      final String details = payload.block();
      if (details == null) {
        return null;
      }
      return MessageBuilder
        .withPayload(details)
        .setHeader("foo", "baz")
        .build();
    };
  }

Of course, I would need to put this on its own executor to avoid tying up the thread, but it doesn't work anyway - it throws a class cast exception.

error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1651b34e]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoOnAssembly cannot be cast to org.springframework.messaging.Message, failedMessage=GenericMessage [payload=Mono.flatMap

This seems to be because the Transformer is evaluated on assembly.

Using a FluxMessageChannel seems strange, since the source is not reactive itself, but it allows me to use a service activator transform the payload.

    @Bean
    @ServiceActivator(inputChannel = "fromR2dbcChannel", reactive = @Reactive("publishMessageWithString"))
    public MessageHandler toValidator() {
      return message -> validationChannel().send(message);
    }

    @Bean
    public Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> publishMessageWithString() {
      return flux -> flux
        .as(toMessageWithString());
    }

    private Function<Flux<Message<Mono<Event>>>, Flux<Message<String>>> toMessageWithString() {
      return messageFlux -> messageFlux.map(message -> {
        final Mono<Event> payload = message.getPayload();
        if (payload != null) {
          final Event event = payload.block();
          if (event != null) {
            return MessageBuilder
              .withPayload(event.getDetails())
              .setHeader("foo", "baz")
              .build();
          }
        }
        return null;
      });
    }

In real life, the message handler attached to the service activator does not understand flux natively. It's a KinesisMessageHandler.

While this seems to work, it feels awkward. Is there a better way to transform that's reactor friendly? Or an adapter that subscribes and invokes a handler? It seems very reasonable to pass Message<Mono<Event>> end to end, so the latter may not be appropriate. Thanks!

UPDATE

Thanks to Artem, a working test follows:

import java.time.Instant;
import java.util.Arrays;
import java.util.List;

import io.r2dbc.h2.H2ConnectionConfiguration;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.annotation.Id;
import org.springframework.data.r2dbc.config.AbstractR2dbcConfiguration;
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate;
import org.springframework.data.r2dbc.dialect.H2Dialect;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.data.relational.core.mapping.Table;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.r2dbc.inbound.R2dbcMessageSource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

@Slf4j
@SpringJUnitConfig
@DirtiesContext
public class R2dbcTest {

  @Autowired
  DatabaseClient client;

  R2dbcEntityTemplate entityTemplate;

  @Autowired
  QueueChannel validationChannel;

  @Autowired
  FluxMessageChannel fluxChannel;

  @BeforeEach
  public void setup() {
    this.entityTemplate = new R2dbcEntityTemplate(this.client, H2Dialect.INSTANCE);

    List<String> statements =
      Arrays.asList(
        "DROP TABLE IF EXISTS events;",
        "CREATE TABLE events (id INT AUTO_INCREMENT NOT NULL, details VARCHAR2 NOT NULL, timestamp TIMESTAMP NOT NULL);");

    statements.forEach(it -> this.client.sql(it)
      .fetch()
      .rowsUpdated()
      .as(StepVerifier::create)
      .expectNextCount(1)
      .verifyComplete());
  }

  @Test
  public void validateSuccessfulIntegrationFlow() throws InterruptedException {
    this.entityTemplate.insert(new Event(Instant.now(), "Event details"))
      .then()
      .as(StepVerifier::create)
      .verifyComplete();

    // Validate string
    final Message<?> message = validationChannel.receive();
    assertThat(message.getPayload()).isEqualTo("Event details");
    assertThat(message.getHeaders()).containsKey("foo");
  }

  @Import(R2dbcDatabaseConfiguration.class)
  @Configuration
  @EnableIntegration
  static class SpringIntegrationConfiguration {

    @Autowired
    R2dbcEntityTemplate r2dbcEntityTemplate;

    @Bean
    FluxMessageChannel fromR2dbcChannel() {
      return new FluxMessageChannel();
    }

    @BridgeTo(value = "validationChannel")
    @Bean
    FluxMessageChannel fluxChannel() {
      return new FluxMessageChannel();
    }

    @Bean
    QueueChannel validationChannel() {
      return new QueueChannel();
    }

    @ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "fluxChannel", async = "true")
    public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
      return eventMono.map(event ->
        MessageBuilder
          .withPayload(event.getDetails())
          .setHeader("foo", "baz")
          .build());
    }

    // Cron expression is only here because Spring environment is fully initialized before test
    // creates table, so wait for the test to start.
    @Bean
    @InboundChannelAdapter(value = "fromR2dbcChannel", poller = @Poller(cron = "30/2 * * * * *"))
    public R2dbcMessageSource r2dbcMessageSourceSelectOne(R2dbcEntityTemplate r2dbcEntityTemplate) {
      R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(r2dbcEntityTemplate,
        "SELECT * FROM events LIMIT 1");
      r2dbcMessageSource.setPayloadType(Event.class);
      r2dbcMessageSource.setExpectSingleResult(true);
      return r2dbcMessageSource;
    }

  }

  @Configuration
  @EnableR2dbcRepositories(basePackages = "org.springframework.integration.r2dbc.repository")
  static class R2dbcDatabaseConfiguration extends AbstractR2dbcConfiguration {

    @Bean
    @Override
    public ConnectionFactory connectionFactory() {
      return createConnectionFactory();
    }

    public ConnectionFactory createConnectionFactory() {
      return new H2ConnectionFactory(H2ConnectionConfiguration.builder()
        .inMemory("r2dbc")
        .username("sa")
        .password("")
        .option("DB_CLOSE_DELAY=-1").build());
    }

    @Bean
    public DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
      return DatabaseClient.create(connectionFactory);
    }

    @Bean
    public R2dbcEntityTemplate r2dbcEntityTemplate(DatabaseClient databaseClient) {
      return new R2dbcEntityTemplate(databaseClient, H2Dialect.INSTANCE);
    }

  }

  @Table("events")
  @Getter
  @Setter
  @RequiredArgsConstructor
  static class Event {
    @Id
    private Integer id;

    @NonNull
    public Instant timestamp;
    @NonNull
    public String details;

  }

}


Solution

  • I would advice to re-think your vision about reactive stream and start avoiding calling that .block() manually.

    Looking to your whole flow requirements, it is really better to make that fromR2dbcChannel as a FluxMessageChannel, so your Mono from R2DBC is going to be subscribed and processed smoothly internally by the framework if there is data.

    Your @Transformer(inputChannel = "fromR2dbcChannel", outputChannel = "fromTransformer") then could just deal with a plain Event as an input parameter. Then your KinesisMessageHandler is good to deal with whatever you send to its input channel in the palyoad from that event.getDetails().

    UPDATE

    So, my bad. Independently of the channel for @InboundChannelAdapter, it still going to produce a Mono in the payload of the message. From here the channel type really doesn't matter. |But at the same time you can make that validationChannel as a FluxMessageChannel and then your transformer must be changed to the service activator:

     @ServiceActivator(inputChannel = "fromR2dbcChannel", outputChannel = "validationChannel", async = "true")
        public Mono<Message<String>> transformEvent(Mono<Event> eventMono) {
          return eventMono.map(
                    MessageBuilder
                       .withPayload(event.getDetails())
                       .setHeader("foo", "baz")
                       .build());
        }
    

    This way the result Mono is going to be subscribed by the FluxMessageChannel and the rest of your "blocking" flow should remain the same.

    The problem with transformer that it does return the Message if result of POJO method is not a Message.