My program uses spring boot integrated kafka.The code is as follows:
@Configuration
public class KafkaConfiguration {
@Bean
public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
factory.setBatchListener(true);
return factory;
}
}
@Configuration
@Slf4j
public class KafkaMsgConsumer {
@Autowired
private KafkaMsgConsumerService kafkaMsgConsumerService;
@Autowired
private SysTodoMiddleUtil sysTodoMiddleUtil;
@Autowired
private SysTodoServiceImpl sysTodoServiceImpl;
@KafkaListener(topics = "${topicName}", groupId = "act")
public void consumer(String message) {
System.out.println("kafka consumer message: " + message);
kafkaMsgConsumerService.handle(message);
}
@KafkaListener(topics = "${topicProfile}" + "eventTodoCreate", groupId = "event", containerFactory = "batchFactory")
public void eventTodoCreateConsumer(ConsumerRecords<String, String> records) {
if (records.isEmpty()) {
return;
}
log.info("consumer message count:" +records.count());
List<SysTodo> eventTodos = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
log.info("eventTodoCreate consumer message: " + record);
SysTodo eventTodo = JSONObject.parseObject(record.value(), SysTodo.class);
eventTodos.add(eventTodo);
}
sysTodoServiceImpl.insertBatch(eventTodos);
}
@KafkaListener(topics = "${topicProfile}" + "eventTodoFinish", groupId = "event")
public void eventTodoFinishConsumer(String message) {
log.info("eventTodoFinish consumer message: " + message);
EventFinishVo eventFinishVo = JSONObject.parseObject(message, EventFinishVo.class);
sysTodoMiddleUtil.handleFinishMessage(eventFinishVo);
}
}
There was no problem in the previous test, but the latest retest found that the service would be oom about a few hours after it was deployed. Using mat, the dump file showed an obvious Memory leak, as shown below:
Looking at the stack, it was found that there are many kafka-admin-client-thread
I guess the phenomenon is that there was something wrong with the consumption process that caused the thread to not be recycled, ultimately resulting in OMM. But my Kafka code logic is very simple, and I can't see any problems.And there are no related errors in my error log.
Thanks to Gary's answer,i find this code in my project.
private boolean topicIsExist(String topicName) {
boolean isExist = false;
if (StringUtils.isBlank(topicName)) {
log.error("topicName is null");
return isExist;
}
AdminClient client = AdminClient.create(kafkaAdmin.getConfig());
ListTopicsResult listTopicsResult = client.listTopics();
try {
Set<String> names = listTopicsResult.names().get();
if (names.contains(topicName)) {
isExist = true;
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return isExist;
}
Every time a message is produced, this method is called to check if the topic exists first, and the AdminClient is not properly closed after use.After correcting this issue, I runned several more tests and did not experience any memory leaks.