Search code examples
jsonspring-bootapache-kafkaspring-kafka

Deserialize kafka messages in KafkaConsumer using springboot


I have a springboot app that listen kafka messages and convert them to object

@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        Hostel hostel = objectMapper.readValue(message, Hostel.class);
}

I woder if it is possible to do ti directly

@KafkaListener(topics = "test", groupId = "group_id")
public void consume(Hostel hostel) throws IOException { 
}

Solution

  • You can do it using spring-kafka. But then you need to use a custom deserializer (or a JsonDeserializer) in the container factory

    @KafkaListener(topics = "test", groupId = "my.group", containerFactory = "myKafkaFactory")
    fun genericMessageListener(myRequest: MyRequest, ack: Acknowledgment) {
    //do Something with myRequest
    ack.acknowledge()
    }
    

    Your ContainerFactory will look something like

    @Bean
    fun myKafkaFactory(): ConcurrentKafkaListenerContainerFactory<String, MyRequest> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, MyRequest>()
    factory.consumerFactory = DefaultKafkaConsumerFactory(configProps(), StringDeserializer(), MyRequestDeserializer())
    factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
    return factory
    }
    

    Your Deserialiser will look like

    public class MyRequestDeserializer implements Deserializer {
    private static ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public void configure(Map map, boolean b) {
    }
    
    @Override
    public MyRequest deserialize(String arg0, byte[] msgBytes) {
        try {
            return objectMapper.readValue(new String(msgBytes), MyRequest.class);
        } catch (IOException ex) {
            log.warn("JSON parse/ mapping exception occurred. ", ex);
            return new MyRequest();
        }
    }
    
    @Override
    public void close() {
        log.debug("MyRequestDeserializer closed");
    }
    }
    

    Alternatively, you can use the default JsonDeserializer as given in spring docs