I want to send Java Object through Kafka and receive them as Java Object as well. I thought the following configuration for ProducerFactory and ConsumerFactory will suffice, but I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord. What do I need to change in ConsumerFactory?
Consumer Configuration:
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
public ConsumerFactory<String, Object> consumerFactory(String groupId) {
Map<String, Object> props = new HashMap<>();
JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);
deserializer.addTrustedPackages("*");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("headers"));
return factory;
}
}
Producer Configuration:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
As you mentioned you can get the java object using the value of the consumer record.
@component
public class Listener {
@kafkaListener
void listen(ConsumerRecord<String, Car> record) {
Car car = record.value();
}
}
by accessing the record.value(). You can get the relevant java object for that you have to have to deserializers in place.