I'd like to understand how to create a reactive channel adapter for Spring Integration with Reactor core. I've understood from other forums I've read that this Mongo DB reactive adapter can be a good example, but it contains lots of Mongo domain specific classes.
I've read the Reactive part of the docs, and I see that there is a need to implement MessageProducerSupport
, but from the code example it looks that there is a need of implementing a class the extends MessageProducerSpec
and calls the first one. Can someone give an example for the most basic usage and explain what is really a demand for creating such a channel adapter? What I understand that I should do is something like:
public IntegrationFlow buildPipe() {
return IntegrationFlows.from(myMessageProducerSpec)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
The MessageProducerSpec
is for Java DSL. It has nothing to do with low-level logic of the channel adapter. If you have a MessageProducerSupport
, then this one is good enough for you to use in the flow definition:
/**
* Populate the provided {@link MessageProducerSupport} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageProducer}.
* @param messageProducer the {@link MessageProducerSupport} to populate.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {
See more in docs about arbitrary channel adapter usage in the Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-protocol-adapters
But again: forget about Java DSL API for your channel adapter. Implement that channel adapter first. Yes, reactive MessageProducerSupport
must use subscribeToPublisher()
in its doStart()
implementation. The rest of the logic around building the Flux
from source system is up to you and the library you are going to rely on.
There is also a ReactiveRedisStreamMessageProducer
and ZeroMqMessageProducer
, but I cannot say that their code is easier to digest than the mentioned MongoDbChangeStreamMessageProducer
.