Search code examples
javaspring-bootapache-kafkaspring-kafka

Send / Receive Java Objects through Kafka


I want to send Java Object through Kafka and receive them as Java Object as well. I thought the following configuration for ProducerFactory and ConsumerFactory will suffice, but I am receiving payload as org.apache.kafka.clients.consumer.ConsumerRecord. What do I need to change in ConsumerFactory?

Consumer Configuration:

@Configuration
public class KafkaConsumerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

public ConsumerFactory<String, Object> consumerFactory(String groupId) {
    Map<String, Object> props = new HashMap<>();
    JsonDeserializer<Object> deserializer = new JsonDeserializer<>(Object.class);

    deserializer.addTrustedPackages("*");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> headersKafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory("headers"));
  return factory;
}

}

Producer Configuration:

@Configuration
public class KafkaProducerConfig {

@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;

@Bean
public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

}


Solution

  • As you mentioned you can get the java object using the value of the consumer record.

     @component
     public class Listener {
    
      @kafkaListener
      void listen(ConsumerRecord<String, Car> record) {
      Car car = record.value();
      }
     }
    

    by accessing the record.value(). You can get the relevant java object for that you have to have to deserializers in place.