Kafka: Different Deserializers For Different Topics

I am currently consuming massages from different Kafka topics using Spring Boot and @KafkaListener annotation.

All of the topics currently contain messages in Avro format (I'm using KafkaAvroDeserializer).

But now, I have to read a new topic, which contains messages in JSON format.

Is it possible to use JsonDeserializer with this new topic and KafkaAvroDeserializer with the legacy topics?


  • I would suggest using the following configuration for declaring Kafka listener container factories:

    public class KafkaConsumersConfig {
      @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
      private String bootstrapServers;
      private <K, V> ConsumerFactory<K, V> generateFactory(
          Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props, keyDeserializer, valueDeserializer);
      public ConcurrentKafkaListenerContainerFactory<String, YourClass>
          kafkaListenerContainerFactoryJson() {
        ConcurrentKafkaListenerContainerFactory<String, YourClass> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        var consumerFactory = generateFactory(new StringDeserializer(), new JsonDeserializer<>(YourClass.class, false));
        return factory;
      //TODO: Declare bean for Avro deserializer

    Where generateFactory method is used to generate a factory based on given key and value deserializers. For different pairs of key and value deserializers, a new ConcurrentKafkaListenerContainerFactory bean should be declared.

    Accordingly, KafkaListener should be declared as shown below.

          topics = "${kafka.topic.jsonTopic}",
          containerFactory = "kafkaListenerContainerFactoryJson")
      public void consume(ConsumerRecord<String, YourClass> record) {
        // ...

    Where containerFactory corresponds to the bean name from the configuration class.