The below shows a snippet for the function, please suggest how to send data to different topics based on if it has error or not
public Function<KStream<String,?>, KStream<String,?>> process(){
return input -> input.map(key, value) {
try{
// logic of function here
}catch(Exception e) {
// How do I send to different topic from here??
}
return new KeyValue<>(key,value);
}
}
Set the kafka consumer binding's enableDlq
option to true; when the listener throws an exception the record is sent to the dead letter topic after retries are exhausted. If you want to fail immediately, set the consumer binding's maxAttempts
property to 1 (default is 3).
See the documentation.
enableDlq
When set to true, it enables DLQ behavior for the consumer. By default, messages that result in errors are forwarded to a topic named
error.<destination>.<group>
. The DLQ topic name can be configurable by setting thedlqName
property or by defining a @Bean of typeDlqDestinationResolver
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome. See Dead-Letter Topic Processing processing for more information. Starting with version 2.0, messages sent to the DLQ topic are enhanced with the following headers:x-original-topic, x-exception-message, and x-exception-stacktrace as byte[]
. By default, a failed record is sent to the same partition number in the DLQ topic as the original record. See Dead-Letter Topic Partition Selection for how to change that behavior. Not allowed whendestinationIsPattern
is true.
Default: false.