I am trying to consume a JSON message using spring kafka. The message which is consumed by the consumer is like this.
{
"EventHeader": {
"entityName": "Account",
"changedFields": ["Id", "Name"]
},
"NewFields": {
"Id": "001",
"Name": "Test Account",
},
"OldFields": {}
}
So far I have created classes for "EventHeader", "NewFields","OldFields" ,and for "KafkaPayload". And also I have created a custom deserializer to deserialize this JSON payload.Here is my custom deserializer.
public class CustomDeserailizer <T extends Serializable> implements Deserializer<T> {
private ObjectMapper objectMapper = new ObjectMapper();
public static final String VALUE_CLASS_NAME_CONFIG = "value.class.name";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
Deserializer.super.configure(configs, isKey);
}
@Override
public T deserialize(String topic, byte[] objectData) {
return (objectData == null) ? null : (T) SerializationUtils.deserialize(objectData);
}
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
return Deserializer.super.deserialize(topic, headers, data);
}
@Override
public void close() {
Deserializer.super.close();
}
}
I have set the consumer configurations as below.
public class KafkaConfig {
@Bean
public KafkaConsumer<String, KafkaPayload> consumerFactory(){
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);
return new KafkaConsumer<>(config);
}
}
Now I need to show the consumed message through a @KafkaListener setting the consumer into ConsumerFactory. But I don't understand how to do that. This is my first time using kafka.So could anyone give me some idea about this?
This is how I am trying to do that.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListener(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
This is my KafkaListener
public class ConsumerService {
@KafkaListener(topics = "Topic", groupId = "sample-group",containerFactory = "kafkaListener")
public void consume(KafkaPayload kafkaPayload){
System.out.println("Consumed Message :"+ kafkaPayload);
}
}
Since you are using Spring Boot, just set the value deserializer class name as a property and Boot will automatically wire it into the container factory for your @KafkaListener
. No need to define your own consumer factory or container factory.
spring.kafka.consumer.value-deserializer=com.acme.CustomDeserializer