I am using spring kafka version 2.2.4 Release and Kafka version 2.11. I am using ContainerStoppingErrorHandler as my error handler. Whenever there is an exception this method is called and stops the container. Now I need to stop the container based on the exception type if some DB exception occurs it should stop the container for other exception types it should send email to the group. Below is my error handler code
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> messageKafkaListenerContainerFactory() {
//consumer configs...
factory.setErrorHandler(new ContainerStoppingErrorHandler() {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
if (thrownException instanceof ConnectionException) {
LOGGER.error("Database exception occured stopping the container");
super.handle(thrownException, records, consumer, container);
} else {
//send email about error without discarding the records
}
}
}
}
I am able to stop the container based on the DB exception but for other exceptions the records in the poll including the error record is getting discarded so I am losing the data. Is there any way to handle the exceptions based on the type and invoke error handler if DB exception stop else continue without discarding the remaining records like that.
For other exceptions, delegate to a SeekToCurrentErrorHandler
which will cause seeks for the topics for all the unprocessed records (including the failed record) so they will be redelivered on the next poll().
The STCEH gives up on the failed record after 10 attempts by default, but you can change that by setting the maxAttempts
constructor argument.
EDIT
factory.setErrorHandler(new ContainerStoppingErrorHandler() {
private final SeekToCurrentErrorHandler stceh = new SeekToCurrentErrorHandler(...);
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {
if (thrownException.getCause() instanceof ConnectionException) {
LOGGER.error("Database exception occured stopping the container");
super.handle(thrownException, records, consumer, container);
} else {
//send email about error without discarding the records
this.stceh.handle(thrownException, records, consumer, container);
}
}
}
}