I am using Spring Integration with spring-Kafka so I have created custom listener's to read messages from topics and put them on the spring integration channel. After certain interval say like 10-12 hours pod restarts and Application start's successfully with no error but after sometime.
Note:- This kafka_outgoing_topic does not belongs to my microservice and is not present in my code anywhere.
I see below error in logs:-
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [kafka_outgoing_topic]
"
"2022-08-13T09:24:12.617+0000","2022-08-13 09:24:01.975","1 --- [customlistner-C-1] essageListenerContainer$ListenerConsumer : Authorization Exception and no authorizationExceptionRetryInterval set
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [kafka_outgoing_topic]
"
[customlistner-C-1] essageListenerContainer$ListenerConsumer : group-id : Consumer stopped.
After this point of time my custom listener never starts.
Below is my code:-
@Bean("ConsumerBean")
public KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter() {
KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>(listener());
kafkaMessageDrivenChannelAdapter.setOutputChannel(messageFromKafka());
kafkaMessageDrivenChannelAdapter.setAutoStartup(true);
return kafkaMessageDrivenChannelAdapter;
}
@Bean("customlistner")
public ConcurrentMessageListenerContainer<String, String> listener() {
ContainerProperties properties = new ContainerProperties(kafkaTopic);
properties.setGroupId("group-id");
return (ConcurrentMessageListenerContainer<String, String>) new ConcurrentMessageListenerContainer<>(consumerFactory, properties);
}
Try to follow that Authorization Exception and no authorizationExceptionRetryInterval set
error recommendation.
See docs for more info: https://docs.spring.io/spring-kafka/reference/html/#kafka-container
As of version 2.8, a new container property
authExceptionRetryInterval
has been introduced. This causes the container to retry fetching messages after getting anyAuthenticationException
orAuthorizationException
from theKafkaConsumer
. This can happen when, for example, the configured user is denied access to read a certain topic or credentials are incorrect. DefiningauthExceptionRetryInterval
allows the container to recover when proper permissions are granted.
Sounds like not only your pod is restarted, but some other which is responsible for Apache Kafka broker and you have to wait sometime until it recreates all the topics you are interested in.