Search code examples
spring-bootspring-kafka

Setting authorizationExceptionRetryInterval for Spring Kafka


Anyone know how to set the new property: authorizationExceptionRetryInterval without creating the ConcurrentKafkaListenerContainerFactory manually.


Solution

  • I was going to say...

    @Component
    class ContainerFactoryCustomizer {
    
        ContainerFactoryCustomizer(AbstractKafkaListenerContainerFactory<?, ?, ?> factory) {
            factory.setContainerCustomizer(
                    container -> container.getContainerProperties()
                            .setAuthorizationExceptionRetryInterval(Duration.ofSeconds(10L)));
        }
    
    }
    

    But that doesn't work, due to a bug (the container customizer is not set up).

    Here is a work-around:

    @SpringBootApplication
    public class So60054097Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So60054097Application.class, args);
        }
    
        @KafkaListener(id = "so60054097", topics = "so60054097", autoStartup = "false")
        public void listen(String in) {
            System.out.println(in);
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so60054097").partitions(1).replicas(1).build();
        }
    
    
        @Bean
        public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
            return args -> {
                MessageListenerContainer container = registry.getListenerContainer("so60054097");
                container.getContainerProperties()
                        .setAuthorizationExceptionRetryInterval(Duration.ofSeconds(10L));
                container.start();
            };
        }
    }
    
    

    (Set autoStartup to false; fix the property and start the container).