Search code examples
javaapache-kafkaspring-kafka

Adding ConsumerRebalanceListener to the ConcurrentKafkaListenerContainerFactory


In a Spring Boot application I'm using a class annotated with @KafkaListener as a message listener. I want to add a ConsumerRebalanceLister to my application to manage cached data on a rebalance.

How do I add a ConsumerRebalanceListener to a ConcurrentKafkaListenerContainerFactory. The documentation says that it should be set on a ContainerProperties object. It's not clear how to access that object in order to set it. Additionally, it looks like the ConcurrentKafkaListenerContainerFactory throws away the rebalance listener since it creates a new ContainerProperties object when creating a listener container instance.

I feel like I'm missing something really obvious here, before this commit there was a method to simply set the rebalance listener directly on the ConcurrentKafkaListenerContainerFactory.


Solution

  • Consider to use this method on the ConcurrentKafkaListenerContainerFactory:

    /**
     * Obtain the properties template for this factory - set properties as needed
     * and they will be copied to a final properties instance for the endpoint.
     * @return the properties.
     */
    public ContainerProperties getContainerProperties() {
    

    This is where you can add your ConsumerRebalanceListener. You @Autowired an auto-configured ConcurrentKafkaListenerContainerFactory and perform the mentioned injection:

    @Autowired
    private ConcurrentKafkaListenerContainerFactory containerFactory;
    
    @PostConstruct
    public void init() {
        this.containerFactory.getContainerProperties()
                .setConsumerRebalanceListener(myConsumerRebalanceListener());
    }
    
    @Bean
    public ConsumerRebalanceListener myConsumerRebalanceListener() {
        return new ConsumerRebalanceListener() {
            ...
        };
    }