I want to read from data from Kafka, and on each event, read from MongoDB with the Id and another field from the Kafka event. I wonder in general what is the recommended way to do this, and whether it is possible to do this with the ReactiveMongoDbMessageSource
. I thought that maybe the right operator is .gateway()
or .enrich()
but I'm really not sure. I don't really have a clue about how to use this with a message source so I'm not sure that it's even possible. I'd like to be able to write something like this:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(reactiveKafkaConsumerTemplate.receiveAutoAck()
.map(GenericMessage::new))
.<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
.gateway((message) -> enrichMongoDbPayloadByMessageKey(message.getHeaders().getId())
.handle(new ReactiveElasticsearchMessageHandler());
}
I'd really like to see an example for a mock implementation of my needed enrichMongoDbPayloadByMessageKey()
.
The gateway()
or enricher()
is right direction, depending on your requirements if you'd like to continue the flow with only a result from the MongoDb request, or you want to add more data to the result of that transform()
.
The ReactiveMongoDbMessageSource
is a wrong direction here just because it is used as a source of messages - the beginning of the flow. In your case it is really a service activator based on the result received from the Kafka.
There is no (yet) reactive MongoDb gateway (request-reply channel adapter), but the closest out-of-the-box solution is a MongoDbOutboundGateway
: https://docs.spring.io/spring-integration/docs/current/reference/html/mongodb.html#mongodb-outbound-gateway.
If you really wish to deal here with reactive solution, consider to implement the service method which would receive your arguments, perform a reactive operation on the MongoDB and return you something. See for that goal a ReactiveMongoTemplate.findOne(Query query, Class<T> entityClass)
.
There is no the gateway()
operator with a signature you show.
It is also wrong to use message.getHeaders().getId()
since it does not reflect anything you receive from Kafka.
See more docs about gateway and enricher:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway