Search code examples
javaspring-bootkotlinapache-kafkaspring-kafka

Kafkalistener SpringBoot failsafe


I'm running spring boot with KafkaListener as my client. The question is how can we recover from a failed kafka configuration and avoid that the application stops with Process finished with exit code 0. An example of an incorrect config would e.g. an incorrect endpoint url. Same scenario would apply if the Kafka server would not be reachable. So in any case the KafkaListner process should never kill the server.

 @Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
    val deserializer = JsonDeserializer<Thing>()
    deserializer.addTrustedPackages("de.data.Thing")

    val props: MutableMap<String, Any> = HashMap()
    val serverUrl = server.substringBefore(":")
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
    props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
    props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
    props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"\$ConnectionString\" " +
            "password=\"Endpoint=sb://$serverUrl/;" +
            "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
    return DefaultKafkaConsumerFactory(props,
            StringDeserializer(), StringDeserializer())

}


@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
    val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = consumerFactory()
    factory.setMessageConverter(BytesJsonMessageConverter())
    return factory
}

 @KafkaListener(topics = ["topic"],
        groupId = "group",
        containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
                        record: ConsumerRecord<String, String>,
                        @Headers headers: MessageHeaders) {

    ....
}

 

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) at de.x.ServerAppKt.main(ServerApp.kt:11) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 19 common frames omitted Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ... 33 common frames omitted


Solution

  • If the broker is just down, the application will start fine (with versions earlier than 2.3.4 you had to set missingTopicsFatal to false on the container properties (it has been false by default since then).

    No resolvable bootstrap urls given in...

    This is fatal - it is irrecoverable.

    However, you can set autoStartup=false - either on the @KafkaListener or on the container factory.

    This will prevent Spring from trying to start the containers during application initialization.

    You can then start the containers yourself in a try/catch block.