Search code examples
spring-bootapache-kafkaspring-kafka

Invoke ContainerStoppingErrorHandler based on exception type


I am using spring kafka version 2.2.4 Release and Kafka version 2.11. I am using ContainerStoppingErrorHandler as my error handler. Whenever there is an exception this method is called and stops the container. Now I need to stop the container based on the exception type if some DB exception occurs it should stop the container for other exception types it should send email to the group. Below is my error handler code

public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> messageKafkaListenerContainerFactory() {
//consumer configs...
factory.setErrorHandler(new ContainerStoppingErrorHandler() {
        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                MessageListenerContainer container) {


            if (thrownException instanceof ConnectionException) {

                LOGGER.error("Database exception occured stopping the container");
                super.handle(thrownException, records, consumer, container);

            } else {
                //send email about error without discarding the records
            }

        }
    }
}

I am able to stop the container based on the DB exception but for other exceptions the records in the poll including the error record is getting discarded so I am losing the data. Is there any way to handle the exceptions based on the type and invoke error handler if DB exception stop else continue without discarding the remaining records like that.


Solution

  • For other exceptions, delegate to a SeekToCurrentErrorHandler which will cause seeks for the topics for all the unprocessed records (including the failed record) so they will be redelivered on the next poll().

    The STCEH gives up on the failed record after 10 attempts by default, but you can change that by setting the maxAttempts constructor argument.

    EDIT

    factory.setErrorHandler(new ContainerStoppingErrorHandler() {
    
            private final SeekToCurrentErrorHandler stceh = new SeekToCurrentErrorHandler(...);
    
            @Override
            public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                    MessageListenerContainer container) {
    
    
                if (thrownException.getCause() instanceof ConnectionException) {
    
                    LOGGER.error("Database exception occured stopping the container");
                    super.handle(thrownException, records, consumer, container);
    
                } else {
                    //send email about error without discarding the records
                    this.stceh.handle(thrownException, records, consumer, container);
                }
    
            }
        }
    }