Search code examples
springspring-bootspring-integrationproject-reactorreactor

How to create a Reactive Inbound Channel Adapter in Spring Integration Reactor


I'd like to understand how to create a reactive channel adapter for Spring Integration with Reactor core. I've understood from other forums I've read that this Mongo DB reactive adapter can be a good example, but it contains lots of Mongo domain specific classes.

I've read the Reactive part of the docs, and I see that there is a need to implement MessageProducerSupport, but from the code example it looks that there is a need of implementing a class the extends MessageProducerSpec and calls the first one. Can someone give an example for the most basic usage and explain what is really a demand for creating such a channel adapter? What I understand that I should do is something like:

public IntegrationFlow buildPipe() {
   return IntegrationFlows.from(myMessageProducerSpec)
      .handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
      .handle(writeToKafka)
      .get();
}

Solution

  • The MessageProducerSpec is for Java DSL. It has nothing to do with low-level logic of the channel adapter. If you have a MessageProducerSupport, then this one is good enough for you to use in the flow definition:

    /**
     * Populate the provided {@link MessageProducerSupport} object to the {@link IntegrationFlowBuilder} chain.
     * The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageProducer}.
     * @param messageProducer the {@link MessageProducerSupport} to populate.
     * @return new {@link IntegrationFlowBuilder}.
     */
    public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {
    

    See more in docs about arbitrary channel adapter usage in the Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters

    But again: forget about Java DSL API for your channel adapter. Implement that channel adapter first. Yes, reactive MessageProducerSupport must use subscribeToPublisher() in its doStart() implementation. The rest of the logic around building the Flux from source system is up to you and the library you are going to rely on.

    There is also a ReactiveRedisStreamMessageProducer and ZeroMqMessageProducer, but I cannot say that their code is easier to digest than the mentioned MongoDbChangeStreamMessageProducer.