I am using spring-kafka
to implement a stream application using Spring Boot 1.5.16. The version of spring-kafka
we are using is the 1.3.8.RELEASE.
I am searching for a method to close the boot application in case of an error that terminates all the threads associated withKafka Streams. I found that inside the KafkaStreams
there is the possibility to register a handle for uncaught exceptions. The method is setGlobalStateRestoreListener
.
I saw that this method is exposed inside spring-kafka
in the type KStreamBuilderFactoryBean
.
My question is the following. Is there a simple way to register the UncaughtExceptionHandler
as a bean and let Spring to properly inject inside the factory bean? Or should I create the KStreamBuilderFactoryBean
my own and set manually the handler?
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME)
public KStreamBuilderFactoryBean kStreamBuilderFactoryBean(StreamsConfig streamsConfig) {
final KStreamBuilderFactoryBean streamBuilderFactoryBean = new KStreamBuilderFactoryBean(
streamsConfig);
streamBuilderFactoryBean.setUncaughtExceptionHandler((threadInError, exception) -> {
// Something happens here
});
return streamBuilderFactoryBean;
}
Thanks a lot.
Yes. In that old version you have to specify a KStreamBuilderFactoryBean
bean by yourself with an appropriate injections and exactly that KafkaStreamsDefaultConfiguration.DEFAULT_KSTREAM_BUILDER_BEAN_NAME
.
In the later versions we have already a StreamsBuilderFactoryBeanConfigurer
to still keep that auto-configured KStreamBuilderFactoryBean
, but be able to modify it whatever way we need.
UPDATE
You just create it as a bean in your application context and the framework will pick it up to apply on the StreamsBuilderFactoryBean
:
@Bean
StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
factoryBean.setCloseTimeout(...);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}