Search code examples
spring-bootapache-kafkaspring-kafka

spring kafka registerListenerContainer method startImmediately flag true or false?


I am using spring boot 2.7 + kafka 3.0.

I have a bean in my configuration as below to define the KafkaListenerEndpointRegistry.

In this bean, I need to register my listener with method registerListenerContainer.

  @Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
   public KafkaListenerEndpointRegistry defaultKafkaListenerEndpointRegistry(Map<String, RetryConfig> retryConfigMap) {
        return new KafkaListenerEndpointRegistry() {
            @Override
            public void registerListenerContainer(
                    KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
                // register original one
                MethodKafkaListenerEndpoint<Object, Object> methodEndpoint = (MethodKafkaListenerEndpoint<Object, Object>) endpoint;
                String originalTopic = new ArrayList<String>(endpoint.getTopics()).get(0);
               

                super.registerListenerContainer(methodEndpoint, factory);
                ...
                
            }
        };
    }

The method registerListenerContainer has a property startImmediately which by default is false, Should I set it to true, From the doc it says:

The startImmediately flag determines if the container should be started immediately.

But This is really not clear to me, what's the difference of the different setting? and what impact would it be if set to false?


Solution

  • If you set it to false, then you have to start a container for that endpoint manually later. Use KafkaListenerEndpointRegistry.getListenerContainer(String id) API to call its start(). Otherwise it is started automatically by the ApplicationContext lifecycle: or after the whole refresh, or immediately if you register that endpoint at runtime.

    Not clear, though, why would one do that KafkaListenerEndpointRegistry overriding if in most cases the @KafkaListener is enough. The topic retry can be handled by the RetryableTopic.