Search code examples
springspring-bootapache-kafkaspring-kafka

How to provide topic specific properties in Spring Kafka?


In my application, I am using Spring Kafka and there are multiple consumers consuming each consuming from a separate topic. I want to handle the consumer properties from application.properties. Below are some of the properties that I want to add:

spring.kafka.consumer.group-id=test
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=<Deserializer.class>
spring.kafka.consumer.value-deserializer=<Deserializer.class>
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=<Deserializer.class>
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=<Deserializer.class>

I wanted to handle all the above configs for each topic (without creating a custom ContainerFactory). How can I do that?


Solution

  • You can override specific properties in the @KafkaListener. This was added to avoid having to create multiple factories.

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-properties

    Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You cannot specify the group.id and client.id properties this way; they will be ignored; use the groupId and clientIdPrefix annotation properties for those.

    @KafkaListener(topics = "myTopic", groupId = "group", properties = {
        "max.poll.interval.ms:60000",
        ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
    })
    
    @KafkaListener(id = "two", topics = "two",
            properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
    public void listen2(byte[] in) {
        System.out.println("2: " + new String(in));
    }
    

    These properties can, themselves be properties...

    @KafkaListener(topics = "myTopic", groupId = "group", properties = {
        "max.poll.interval.ms:${myTopic.mpi:60000}",
        ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
    })
    

    Similarly, you can override producer properties in a KafkaTemplate.

    Another alternative is to add a ListenerContainerCustomizer to the container factory, allowing you to override properties for specific containers.