Search code examples
spring-kafka

Spring Kafka Stop container on Exception


I am using ConcurrentMessageListenerContainer with auto commit set to false to consume messages from the topic and write to Database. If Database is down I need to stop the container from processing current record in a poll and don't do next poll(). I have implemented DataSourceHealthIndicator and after checking Database status to UP I want to restart my container again to process remaining records.

Any suggestion how can I stop processing remaining records and stop the container, I have tried to use consumer.close(). But it doesn't stop the process and kept throwing consumer is already closed.


Solution

  • Call container.stop(). If you are using @KafkaListeners, call stop() on the listener container registry which will stop all registered containers.

    EDIT

    Error handlers for automatically stopping the container have been available since the 2.1 release; at the time of writing the current version is 2.2.3.