In my Spring Boot project where I have a number of Spring Kafka consumers, I have added a number of event listeners to monitor the health of these consumers. Here is the code:
@Component
public class ApplicationContextListeningService {
@EventListener
public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
}
}
Does anyone know under what circumstances these events will be thrown (and maybe how I can manually trigger these events for testing purposes)? And also for the last three events (ConsumerStoppedEvent, ListenerContainerIdleEvent, and NonResponsiveConsumerEvent), when I get one of these, is human intervention needed to address the issue (like restarting the servers to have the consumers created again)? Thanks!
You can emulate them all by injecting a Mock consumer factory into the container.
ConsumerStoppedEvent
is emitted when you stop()
the container.ListenerContainerIdleEvent
just means no records have been received in the idleEventInterval
so it usually doesn't mean there's a problem.NonResponsiveConsumerEvent
- it's hard to say; with older clients the poll()
would block if the server was down so we couldn't emit idle events (or do anything).I don't know if you can still get them with more recent clients; but to simulate it you just need to block in the mock consumer poll()
method for long enough for the monitor task to detect the problem and emit the event.