Search code examples
springspring-bootspring-integrationproject-reactor

Spring Integration recommended way of enriching events with key based queries


I want to read from data from Kafka, and on each event, read from MongoDB with the Id and another field from the Kafka event. I wonder in general what is the recommended way to do this, and whether it is possible to do this with the ReactiveMongoDbMessageSource. I thought that maybe the right operator is .gateway() or .enrich() but I'm really not sure. I don't really have a clue about how to use this with a message source so I'm not sure that it's even possible. I'd like to be able to write something like this:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                        .map(GenericMessage::new))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .gateway((message) -> enrichMongoDbPayloadByMessageKey(message.getHeaders().getId())
            .handle(new ReactiveElasticsearchMessageHandler());
    }

I'd really like to see an example for a mock implementation of my needed enrichMongoDbPayloadByMessageKey().


Solution

  • The gateway() or enricher() is right direction, depending on your requirements if you'd like to continue the flow with only a result from the MongoDb request, or you want to add more data to the result of that transform().

    The ReactiveMongoDbMessageSource is a wrong direction here just because it is used as a source of messages - the beginning of the flow. In your case it is really a service activator based on the result received from the Kafka.

    There is no (yet) reactive MongoDb gateway (request-reply channel adapter), but the closest out-of-the-box solution is a MongoDbOutboundGateway: https://docs.spring.io/spring-integration/docs/current/reference/html/mongodb.html#mongodb-outbound-gateway.

    If you really wish to deal here with reactive solution, consider to implement the service method which would receive your arguments, perform a reactive operation on the MongoDB and return you something. See for that goal a ReactiveMongoTemplate.findOne(Query query, Class<T> entityClass).

    There is no the gateway() operator with a signature you show. It is also wrong to use message.getHeaders().getId() since it does not reflect anything you receive from Kafka.

    See more docs about gateway and enricher:

    https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway

    https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#payload-enricher