Search code examples
apache-kafkaapache-kafka-streamsspring-cloud-stream

How to handle errors occurring during the processing of data in Kafka Streams


I am writing a Java application using Spring Cloud Stream Kafka Streams. Here is the functional method snippet I'm using:

@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input ->
        input.transform(
            () ->
                new Transformer<String, String, KeyValue<String, String>>() {

                  ProcessorContext context;

                  @Override
                  public void init(ProcessorContext context) {
                    this.context = context;
                  }

                  @Override
                  public void close() {}

                  @Override
                  public KeyValue<String, String> transform(String key, String value) {                       
                         String result = fetch_data_from_database(key, value);
                         return new KeyValue<>(key, result);
                  }
});

fetch_data_from_database() can throw an Exception.

How can I stop the processing of the inbound KStream(offset should not get committed) in case of exception from fetch_from_database() and make it retry processing with the same offset data?


Solution

  • In this case, you need to retry the logic on your own. For that, you can use Spring's RetryTemplate. This answer has the details about how to use the RetryTemplate within Kafka Streams. It does not use the low-level Processor API as you have, but it's the same idea. Wrap your database call within a retry template and customize the retries based on your requirements. Any upstream processing will be paused until the retries are exhausted.