Search code examples

How to handle AWS Secret Manager rotation in a Spring boot project for Kafka consumers with SASL Authentication Strategy

I have a spring boot application with the following configuration for Kafka consumers:

class KafkaConsumerConfig(
    @Value("\${aws.secret-manager.sasl-auth.secret-name}") private val kafkaAuthSecretName: String,
    private val kafkaProperties: KafkaProperties,
    private val awsSecretManagerAdaptor: AwsSecretManagerAdaptor,
    private val applicationContext: ApplicationContext
) {

    private val logger = KotlinLogging.logger { }

    fun kafkaListenerContainerFactory():
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.isMissingTopicsFatal = false
        factory.setErrorHandler { exception, data ->
            logger.error("Error in process with Exception {} and the record is {}", exception, data)
        return factory

    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(consumerConfig())

    fun consumerConfig(): Map<String, Any> {
        val props = kafkaProperties.buildConsumerProperties()
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] =
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =

        val secretValue = awsSecretManagerAdaptor.getSecretValue(kafkaAuthSecretName)
        val username = getUsername()
        val saslJaasConfig =
            """ required username="$username" password="$secretValue";"""
        props[SaslConfigs.SASL_JAAS_CONFIG] = saslJaasConfig

        return props

    private fun getUsername(): String? {
        val secretTags = awsSecretManagerAdaptor.getSecretTags(kafkaAuthSecretName)
        return secretTags.firstOrNull { it.key().equals("username") }?.value()

    private fun retryTemplate(): RetryTemplate {
        val retryTemplate = RetryTemplate()

        return retryTemplate

    private fun getFixedBackOffPolicy(): BackOffPolicy {
        val fixedBackOffPolicy = FixedBackOffPolicy()
        fixedBackOffPolicy.backOffPeriod = 3000
        return fixedBackOffPolicy

    private fun getSimpleRetryPolicy(): SimpleRetryPolicy {
        val simpleRetryPolicy = SimpleRetryPolicy()
        simpleRetryPolicy.maxAttempts = 3
        return simpleRetryPolicy

The Kafka server provides SASL Authentication with username and password. As You can see the username and password are fetched using a service called AwsSecretManagerAdaptor from AWS secret manager. The configuration works like charm, however, once the Secret is rotated, and the Kafka consumers are restarted the SASL authentication fails. To solve the issue I am now restarting the SpringBoot Application in order to read the rotated key correctly from AWS Secret Manager.

The solution works but as you can see restarting the application is very ugly and error-prone. I wonder do you have any better suggestions for the improvement?


  • Kafka provides support for custom SASL call back handler. You can override the default SASL call back handler for name and password call back and return the username and password values from AWS secret manager without reconstructing consumer or producer.

    Check this

    You can extend the class ‘’ and provide your own implementation for Namecallback or passwordcallback