Search code examples
javaspring-batchapache-kafka-streamsspring-kafka

How to configure an UncaughtExceptionHandler in Spring Kafka Stream


I am using spring-kafka to implement a stream application using Spring Boot 1.5.16. The version of spring-kafka we are using is the 1.3.8.RELEASE.

I am searching for a method to close the boot application in case of an error that terminates all the threads associated withKafka Streams. I found that inside the KafkaStreams there is the possibility to register a handle for uncaught exceptions. The method is setGlobalStateRestoreListener.

I saw that this method is exposed inside spring-kafka in the type KStreamBuilderFactoryBean.

My question is the following. Is there a simple way to register the UncaughtExceptionHandler as a bean and let Spring to properly inject inside the factory bean? Or should I create the KStreamBuilderFactoryBean my own and set manually the handler?

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
    final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
            streamsConfig);
    streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
        // Something happens here
    });
    return streamBuilderFactoryBean;
}

Thanks a lot.


Solution

  • Yes. In that old version you have to specify a KStreamBuilderFactoryBean bean by yourself with an appropriate injections and exactly that KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME.

    In the later versions we have already a StreamsBuilderFactoryBeanConfigurer to still keep that auto-configured KStreamBuilderFactoryBean, but be able to modify it whatever way we need.

    UPDATE

    You just create it as a bean in your application context and the framework will pick it up to apply on the StreamsBuilderFactoryBean:

        @Bean
        StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
            return new StreamsBuilderFactoryBeanConfigurer() {
    
                @Override
                public void configure(StreamsBuilderFactoryBean factoryBean) {
                    factoryBean.setCloseTimeout(...);
                }
    
                @Override
                public int getOrder() {
                    return Integer.MAX_VALUE;
                }
    
            };
        }