Had gone through multiple posts but most of them are related handling Bad messages not about exception handling while processing them.
I want to know to how to handle the messages that is been received by the stream application and there is an exception while processing the message? The exception could be because of multiple reasons like Network failure, RuntimeException etc.,
setUncaughtExceptionHandler
? or is there a better way?it depends what do you want to do with exceptions on producer side.
if exception will be thrown on producer (e.g. due to Network failure or kafka broker has died), stream will die by default. and with kafka-streams version 1.1.0 you could override default behavior by implementing ProductionExceptionHandler
like the following:
public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
@Override
public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]", new String(record.value()), record.topic(), exception);
return ProductionExceptionHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
}
}
from handle method you could return either CONTINUE
if you don't want streams dying on exception, on return FAIL
in case you want stream stops (FAIL is default one).
and you need specify this class in stream config:
default.production.exception.handler=com.example.CustomProductionExceptionHandler
Also pay attention that ProductionExceptionHandler
handles only exceptions on producer, and it will not handle exceptions during processing message with stream methods mapValues(..)
, filter(..)
, branch(..)
etc, you need to wrap these method logic with try / catch blocks (put all your method logic into try block to guarantee that you will handle all exceptional cases):
.filter((key, value) -> { try {..} catch (Exception e) {..} })
as I know, we don't need to handle exceptions on consumer side explicitly, as kafka streams will retry automatically consuming later (as offset will not be changed until messages will be consumed and processed); e.g. if kafka broker will be not reachable for some time, you will got exceptions from kafka streams, and when broken will be up, kafka stream will consume all messages. so in this case we will have just delay and nothing corrupted/lost.
with setUncaughtExceptionHandler
you will not be able to change default behavior like with ProductionExceptionHandler
, with it you could only log error or send message into failure topic.
Update since kafka-streams
2.8.0
since kafka-streams
2.8.0
, you have the ability to automatically replace failed stream thread (that caused by uncaught exception)
using KafkaStreams
method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
with StreamThreadExceptionResponse.REPLACE_THREAD
. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});