Search code examples
javaspring-bootapache-kafkaspring-kafka

Send multiples DTOs using Kafka Template


What's the better approach to send multiple types of DTOs using Kafka Template?

Approach 1: Use Object as a type value for the ProducerFactory so I can send many types of Objects using my Kafka Template.

@Bean
public ProducerFactory<String, Object> ProducerFactory() {
    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaTemplate<String, Object> KafkaTemplate() {
    return new KafkaTemplate<>(ProducerFactory());
}

Approach 2: Add another ProducerFactory and Kafka Template configuration for each Object I want to send.

@Bean
public ProducerFactory<String, Student> ProducerFactory1() {
    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(config);
}

public ProducerFactory<String, Person> ProducerFactory2() {
    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaTemplate<String, Student> leadTimeKafkaTemplate() {
    return new KafkaTemplate<>(ProducerFactory());
}
 @Bean
    public KafkaTemplate<String, Person> leadTimeKafkaTemplate() {
        return new KafkaTemplate<>(ProducerFactory());
    }

Solution

  • Using KafkaTemplate<String, Object> KafkaTemplate() is fine when using a serializer such as JSON.

    In fact, the generic type on the template is only important when using one of the receive() methods.