Search code examples
spring-kafkaspring-kafka-test

How to trigger and handle these Spring Kafka events


In my Spring Boot project where I have a number of Spring Kafka consumers, I have added a number of event listeners to monitor the health of these consumers. Here is the code:

@Component
public class ApplicationContextListeningService {

    @EventListener
    public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

}

Does anyone know under what circumstances these events will be thrown (and maybe how I can manually trigger these events for testing purposes)? And also for the last three events (ConsumerStoppedEvent, ListenerContainerIdleEvent, and NonResponsiveConsumerEvent), when I get one of these, is human intervention needed to address the issue (like restarting the servers to have the consumers created again)? Thanks!


Solution

  • You can emulate them all by injecting a Mock consumer factory into the container.

    • ConsumerStoppedEvent is emitted when you stop() the container.
    • ListenerContainerIdleEvent just means no records have been received in the idleEventInterval so it usually doesn't mean there's a problem.
    • NonResponsiveConsumerEvent - it's hard to say; with older clients the poll() would block if the server was down so we couldn't emit idle events (or do anything).

    I don't know if you can still get them with more recent clients; but to simulate it you just need to block in the mock consumer poll() method for long enough for the monitor task to detect the problem and emit the event.