I am using spring boot 2.7 + kafka 3.0.
I have a bean in my configuration as below to define the KafkaListenerEndpointRegistry.
In this bean, I need to register my listener with method registerListenerContainer.
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry(Map<String, RetryConfig> retryConfigMap) {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(
KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
// register original one
MethodKafkaListenerEndpoint<Object, Object> methodEndpoint = (MethodKafkaListenerEndpoint<Object, Object>) endpoint;
String originalTopic = new ArrayList<String>(endpoint.getTopics()).get(0);
super.registerListenerContainer(methodEndpoint, factory);
...
}
};
}
The method registerListenerContainer has a property startImmediately which by default is false, Should I set it to true, From the doc it says:
The startImmediately flag determines if the container should be started immediately.
But This is really not clear to me, what's the difference of the different setting? and what impact would it be if set to false?
If you set it to false
, then you have to start a container for that endpoint manually later. Use KafkaListenerEndpointRegistry.getListenerContainer(String id)
API to call its start()
. Otherwise it is started automatically by the ApplicationContext lifecycle: or after the whole refresh, or immediately if you register that endpoint at runtime.
Not clear, though, why would one do that KafkaListenerEndpointRegistry
overriding if in most cases the @KafkaListener
is enough. The topic retry can be handled by the RetryableTopic
.