I have simple Spring application based on Kafka Streams which consumes message from incoming topic, doing map
transformation and printing this message. KStream
configured like this
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
Inside MyTransformer
I'm calling external microservice which can be down at this time. If call failed (typically throwing RuntimeException
), I can't do my transformation.
The question is here any way to reprocess message in Streams application again if any error happened during previous processing?
Based on my current research here is no way to do so, the only possibility I have is to push the message into dead letters topic and try to process it in the future if it failes again I push it into DLT again and do retries this way.
if any uncaught exception happens during Kafka Streams processing, your stream will change status to ERROR and stop consuming incoming messages for partition on which error occurred.
You need to catch exceptions by yourself. Retries could be achieved either: 1) using Spring RetryTemplate
to invoke external microservice (but keep in mind that you will have delays of consuming messages from a specific partition), or 2) push failed message into another topic for later reprocessing (as you suggested)
Update since kafka-streams
2.8.0
since kafka-streams
2.8.0
, you have the ability to automatically replace failed stream thread (that caused by uncaught exception)
using KafkaStreams
method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
with StreamThreadExceptionResponse.REPLACE_THREAD
. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});