Search code examples
javaapache-kafkaapache-kafka-streams

Kafka Stream exceptions won't trigger commit on the input record


I have a kafka stream that listens on a topic and outputs on another. For ex:

KStream<String, String> messageStream = builder.stream(inputTopic)
KStream<String, String> processedStream = messageStream.process(() -> new CustomProcessor());
processedStream.to(outputTopic)

Inside the process method lets say a NPE occurs. I have implemented an uncaught exception handler that will handle this exception and send the record to dead latter queue. Ofc, this stream will close and after it starts again it will start processing the message again ( because it wasn't commited ). How can i avoid this? How can i commit the record when i receive the exception ?


Solution

  • Everything that happens inside CustomProcessor is under your control. You can wrap your logic into a try/catch and route the results to different streams, e.g. by outputting a "valid" flag and using KStream.split to split the result of your processing into a DLQ and the result.