Search code examples
javaspring-bootapache-kafkaspring-kafka

Kafka: Different Deserializers For Different Topics


I am currently consuming massages from different Kafka topics using Spring Boot and @KafkaListener annotation.

All of the topics currently contain messages in Avro format (I'm using KafkaAvroDeserializer).

But now, I have to read a new topic, which contains messages in JSON format.

Is it possible to use JsonDeserializer with this new topic and KafkaAvroDeserializer with the legacy topics?


Solution

  • I would suggest using the following configuration for declaring Kafka listener container factories:

    @Configuration
    public class KafkaConsumersConfig {
      @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
      private String bootstrapServers;
    
      private <K, V> ConsumerFactory<K, V> generateFactory(
          Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props, keyDeserializer, valueDeserializer);
      }
    
      @Bean
      public ConcurrentKafkaListenerContainerFactory<String, YourClass>
          kafkaListenerContainerFactoryJson() {
        ConcurrentKafkaListenerContainerFactory<String, YourClass> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        var consumerFactory = generateFactory(new StringDeserializer(), new JsonDeserializer<>(YourClass.class, false));
        factory.setConsumerFactory(consumerFactory);
        return factory;
      }
    
      //TODO: Declare bean for Avro deserializer
    }
    

    Where generateFactory method is used to generate a factory based on given key and value deserializers. For different pairs of key and value deserializers, a new ConcurrentKafkaListenerContainerFactory bean should be declared.

    Accordingly, KafkaListener should be declared as shown below.

    @KafkaListener(
          topics = "${kafka.topic.jsonTopic}",
          containerFactory = "kafkaListenerContainerFactoryJson")
      public void consume(ConsumerRecord<String, YourClass> record) {
        // ...
      }
    

    Where containerFactory corresponds to the bean name from the configuration class.