In a Spring Boot application I'm using a class annotated with @KafkaListener as a message listener. I want to add a ConsumerRebalanceLister to my application to manage cached data on a rebalance.
How do I add a ConsumerRebalanceListener to a ConcurrentKafkaListenerContainerFactory. The documentation says that it should be set on a ContainerProperties object. It's not clear how to access that object in order to set it. Additionally, it looks like the ConcurrentKafkaListenerContainerFactory throws away the rebalance listener since it creates a new ContainerProperties object when creating a listener container instance.
I feel like I'm missing something really obvious here, before this commit there was a method to simply set the rebalance listener directly on the ConcurrentKafkaListenerContainerFactory.
Consider to use this method on the ConcurrentKafkaListenerContainerFactory
:
/**
* Obtain the properties template for this factory - set properties as needed
* and they will be copied to a final properties instance for the endpoint.
* @return the properties.
*/
public ContainerProperties getContainerProperties() {
This is where you can add your ConsumerRebalanceListener
. You @Autowired
an auto-configured ConcurrentKafkaListenerContainerFactory
and perform the mentioned injection:
@Autowired
private ConcurrentKafkaListenerContainerFactory containerFactory;
@PostConstruct
public void init() {
this.containerFactory.getContainerProperties()
.setConsumerRebalanceListener(myConsumerRebalanceListener());
}
@Bean
public ConsumerRebalanceListener myConsumerRebalanceListener() {
return new ConsumerRebalanceListener() {
...
};
}