Search code examples
springspring-bootspring-integrationspring-kafkaspring-integration-dsl

Spring Integration Java DSL with Kafka


I am doing a poc with kafka spring integration Java DSL I am reading a row from data base(DB) and send that row as msg to Kafka Topic. please find the code below. Code is compiling and i can able to fetch the record from DB, but i didn't see any msg in topic.

@Configuration
public class KafkaProduceConfig {
 
 @Bean
    public IntegrationFlow pollingAdapterFlow(EntityManagerFactory entityManagerFactory, MyTransformer transformer) {
        return IntegrationFlow
                .from(Jpa.inboundAdapter(entityManagerFactory).entityClass(MyRecord.class),                 
                        e -> e.poller(p -> p.cron("*/1 * * * * *").maxMessagesPerPoll(1).transactional())
                                  .autoStartup(true))
                .log(message -> "Polled DB Records from KafkaProduceConfig : " + message.getPayload())
                .split()
                .log(message -> "Record after split : " + message.getPayload())
                .enrichHeaders(hrdSpec ->hrdSpec.headerExpression("myRecord", "payload",true))
                .transform(transformer,"getCustomeRecord")
                .enrichHeaders(hrdSpec ->hrdSpec.headerExpression("customeRecord","payload",true))
                .log(message -> "Transformed Record : " + message.getPayload() +",topic :" +message.getHeaders().get("topic"))
                .channel("sendToKafka")
                .get();
    }


    @Bean
    public IntegrationFlow outboundChannelAdapterFlow() {
        return IntegrationFlow.from("sendToKafka")
                .log(message -> "outboundChannelAdapterFlow received payload : " + message.getPayload() +",topic :"
                        +message.getHeaders().get("topic")+"key :"+message.getHeaders().get("key"))
                .handle(m->Kafka.outboundChannelAdapter(producerFactory()).topic(m.getHeaders().get("topic").toString())
                        .messageKey(m.getHeaders().get("key").toString())
                       // .headerMapper(mapper())
                        .partitionId((Integer) m.getHeaders().get("partitionId")))
                .get();

    }


    public ProducerFactory<Integer, String> producerFactory() {
    
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }   
}

Msg should publish to Kafka topic.


Solution

  • The configuration .handle(m->Kafka.outboundChannelAdapter(producerFactory()) is not correct. That lambda makes a new MessageHandler which body is just to use that Kafka factory whenever a new message arrives. This code just does not handle this message.

    You must look into a handle() variant where you provide a MessageHandler from the factory, not a new by lambda.

    So, something like this:

            .handle(Kafka.outboundChannelAdapter(producerFactory())
                        .topic(m -> m.getHeaders().get("topic").toString())
                        .messageKey(m -> m.getHeaders().get("key").toString())
                   // .headerMapper(mapper())
                        .partitionId(m -> (Integer) m.getHeaders().get("partitionId")))
    

    This way the MessageHandler is going to be created during configuration phase. And at runtime its handleMessage() method is going to be called against request message. All those option are now lambdas to be called at runtime.

    P.S. Please, edit your question for more readable code snippets.