I am writing a Java application using Spring Cloud Stream Kafka Streams. Here is the functional method snippet I'm using:
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input ->
input.transform(
() ->
new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {}
@Override
public KeyValue<String, String> transform(String key, String value) {
String result = fetch_data_from_database(key, value);
return new KeyValue<>(key, result);
}
});
fetch_data_from_database() can throw an Exception.
How can I stop the processing of the inbound KStream(offset should not get committed) in case of exception from fetch_from_database() and make it retry processing with the same offset data?
In this case, you need to retry the logic on your own. For that, you can use Spring's RetryTemplate
. This answer has the details about how to use the RetryTemplate
within Kafka Streams. It does not use the low-level Processor API as you have, but it's the same idea. Wrap your database call within a retry template and customize the retries based on your requirements. Any upstream processing will be paused until the retries are exhausted.