Search code examples
spring-bootapache-kafkaspring-cloudapache-kafka-streamsspring-kafka

In functional model of spring cloud stream and kafka, how can I send to another topic( error topic) in case of an exception occured?


The below shows a snippet for the function, please suggest how to send data to different topics based on if it has error or not

public Function<KStream<String,?>, KStream<String,?>> process(){
return input -> input.map(key, value) {
try{
// logic of function here 
}catch(Exception e) {
// How do I send to different topic from here??
}
return new KeyValue<>(key,value);
}
}

Solution

  • Set the kafka consumer binding's enableDlq option to true; when the listener throws an exception the record is sent to the dead letter topic after retries are exhausted. If you want to fail immediately, set the consumer binding's maxAttempts property to 1 (default is 3).

    See the documentation.

    enableDlq

    When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named error.<destination>.<group>. The DLQ topic name can be configurable by setting the dlqName property or by defining a @Bean of type DlqDestinationResolver. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers: x-original-topic, x-exception-message, and x-exception-stacktrace as byte[]. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed when destinationIsPattern is true.

    Default: false.