I have a kafka stream that listens on a topic and outputs on another. For ex:
KStream<String, String> messageStream = builder.stream(inputTopic)
KStream<String, String> processedStream = messageStream.process(() -> new CustomProcessor());
processedStream.to(outputTopic)
Inside the process method lets say a NPE occurs. I have implemented an uncaught exception handler that will handle this exception and send the record to dead latter queue. Ofc, this stream will close and after it starts again it will start processing the message again ( because it wasn't commited ). How can i avoid this? How can i commit the record when i receive the exception ?
Everything that happens inside CustomProcessor
is under your control. You can wrap your logic into a try/catch
and route the results to different streams, e.g. by outputting a "valid" flag and using KStream.split
to split the result of your processing into a DLQ and the result.