Search code examples
spring-bootapache-kafkaspring-kafkaspring-integration-dsl

Consumer stopped after authorizationException and never recover


I am using Spring Integration with spring-Kafka so I have created custom listener's to read messages from topics and put them on the spring integration channel. After certain interval say like 10-12 hours pod restarts and Application start's successfully with no error but after sometime.

Note:- This kafka_outgoing_topic does not belongs to my microservice and is not present in my code anywhere.

I see below error in logs:-


org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [kafka_outgoing_topic]
"
"2022-08-13T09:24:12.617+0000","2022-08-13 09:24:01.975","1 --- [customlistner-C-1] essageListenerContainer$ListenerConsumer : Authorization Exception and no authorizationExceptionRetryInterval set

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [kafka_outgoing_topic]
"

 [customlistner-C-1] essageListenerContainer$ListenerConsumer : group-id : Consumer stopped.

After this point of time my custom listener never starts. 

Below is my code:-

@Bean("ConsumerBean")
    public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(listener());
        kafkaMessageDrivenChannelAdapter.setOutputChannel(messageFromKafka());
        kafkaMessageDrivenChannelAdapter.setAutoStartup(true);
        return kafkaMessageDrivenChannelAdapter;
    }
@Bean("customlistner")
    public ConcurrentMessageListenerContainer<String, String> listener()  {
        ContainerProperties properties = new ContainerProperties(kafkaTopic);
        properties.setGroupId("group-id");
        return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(consumerFactory, properties);
         
    }


Solution

  • Try to follow that Authorization Exception and no authorizationExceptionRetryInterval set error recommendation.

    See docs for more info: https://docs.spring.io/spring-kafka/reference/html/#kafka-container

    As of version 2.8, a new container property authExceptionRetryInterval has been introduced. This causes the container to retry fetching messages after getting any AuthenticationException or AuthorizationException from the KafkaConsumer. This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect. Defining authExceptionRetryInterval allows the container to recover when proper permissions are granted.

    Sounds like not only your pod is restarted, but some other which is responsible for Apache Kafka broker and you have to wait sometime until it recreates all the topics you are interested in.