Search code examples
javaapache-kafkaapache-kafka-streamsspring-kafka

Reprocessing a message when an error occurs while processing it in the Kafka Stream


I have simple Spring application based on Kafka Streams which consumes message from incoming topic, doing map transformation and printing this message. KStream configured like this

@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
         PrintAction printAction, String topicName) {
    KStream<String, JsonNode> source = builder.stream(topicName,
                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
    // @formatter:off
    source
        .map(myTransformer)
        .foreach(printAction);
    // @formatter:on
    return source;
}

Inside MyTransformer I'm calling external microservice which can be down at this time. If call failed (typically throwing RuntimeException), I can't do my transformation.

The question is here any way to reprocess message in Streams application again if any error happened during previous processing?

Based on my current research here is no way to do so, the only possibility I have is to push the message into dead letters topic and try to process it in the future if it failes again I push it into DLT again and do retries this way.


Solution

  • if any uncaught exception happens during Kafka Streams processing, your stream will change status to ERROR and stop consuming incoming messages for partition on which error occurred. You need to catch exceptions by yourself. Retries could be achieved either: 1) using Spring RetryTemplate to invoke external microservice (but keep in mind that you will have delays of consuming messages from a specific partition), or 2) push failed message into another topic for later reprocessing (as you suggested)


    Update since kafka-streams 2.8.0

    since kafka-streams 2.8.0, you have the ability to automatically replace failed stream thread (that caused by uncaught exception) using KafkaStreams method void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); with StreamThreadExceptionResponse.REPLACE_THREAD. For more details please take a look at Kafka Streams Specific Uncaught Exception Handler

    kafkaStreams.setUncaughtExceptionHandler(ex -> {
        log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
        return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
    });