Search code examples
springspring-bootapache-kafkaapache-kafka-streamsspring-kafka

How to configure a custom Kafka deserializer and get the consumed JSON data using a KafkaListener


I am trying to consume a JSON message using spring kafka. The message which is consumed by the consumer is like this.

{
    "EventHeader": {
        "entityName": "Account",
        "changedFields": ["Id", "Name"]
    },
    "NewFields": {
        "Id": "001",
        "Name": "Test Account",
    },
    "OldFields": {}
}

So far I have created classes for "EventHeader", "NewFields","OldFields" ,and for "KafkaPayload". And also I have created a custom deserializer to deserialize this JSON payload.Here is my custom deserializer.

   public class CustomDeserailizer <T extends Serializable> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
    public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            Deserializer.super.configure(configs, isKey);
        }
    
        @Override
        public T deserialize(String topic, byte[] objectData) {
            return (objectData == null) ? null : (T) SerializationUtils.deserialize(objectData);
        }
    
        @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            return Deserializer.super.deserialize(topic, headers, data);
        }
    
        @Override
        public void close() {
            Deserializer.super.close();
        }
    }
 

I have set the consumer configurations as below.

public class KafkaConfig {
    @Bean
    public KafkaConsumer<String, KafkaPayload> consumerFactory(){
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
        return new KafkaConsumer<>(config);
    }
}

Now I need to show the consumed message through a @KafkaListener setting the consumer into ConsumerFactory. But I don't understand how to do that. This is my first time using kafka.So could anyone give me some idea about this?

This is how I am trying to do that.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListener(){
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

This is my KafkaListener

public class ConsumerService {

    @KafkaListener(topics = "Topic", groupId = "sample-group",containerFactory = "kafkaListener")
    public void consume(KafkaPayload kafkaPayload){

        System.out.println("Consumed Message :"+ kafkaPayload);
    }

}

Solution

  • Since you are using Spring Boot, just set the value deserializer class name as a property and Boot will automatically wire it into the container factory for your @KafkaListener. No need to define your own consumer factory or container factory.

    spring.kafka.consumer.value-deserializer=com.acme.CustomDeserializer
    

    https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.consumer.value-deserializer