I'm using Project Reactor with Spring Integration to read from Kafka and write to MongoDB, and I the Kafka consume works well, but the .handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory))
stucks. I've seen that the internal code of this function is new ReactiveMongoDbStoringMessageHandler(mongoFactory))
, so I've tried the following (I have a transform()
method that converts from ConsumerRecord
to Mono<String>
, with the @Transformer
annotation):
public IntegrationFlows writeToMongo() {
return IntegrationFlows.from(kafkaChannel)
.transform(this)
.handle(new ReactiveMongoDbStoringMessageHandler(mongoFactory))
.get();
}
The code follows the docs https://docs.spring.io/spring-integration/reference/html/mongodb.html#mongodb-reactive-channel-adapters.
The error I get is:
java.lang.IllegalArgumentException: Found ambiguous parameter type [class java.lang.Void] for method match:
and then a very long list of functions. Any reason this could happen?
You cannot do new ReactiveMongoDbStoringMessageHandler(mongoFactory)
if you are not going to subscribe to the returned Mono
. A .handle(MongoDb.reactiveOutboundChannelAdapter(mongoFactory))
is the right way to do since it wraps that ReactiveMongoDbStoringMessageHandler
into a ReactiveMessageHandlerAdapter
for automatic subscription.
However I think your real problem is with the .transform(this)
. I believe you have a lot of methods in this class, so be more specific with method name. And this has nothing to do with Project Reactor. Not sure though why would one try to convert to Mono
before sending to that ReactiveMongoDbStoringMessageHandler
... You probably have problem in providing the payload (ConsumerRecord
?) which is not MongoDB mapped entity for saving into the collection.