I want to skip messages that exceed the threshold value in the message headers. We can access the message headers in the streams using the KafkaStream#process method. But, I don't know how to skip messages that exceed the threshold value in the header. I couldn't use the filter because it couldn't access the header in the KStream#filter() method.
I'm trying to write in java using Spring Kafka Stream. Is there a way to do that?
public class MessageHeaderProcessor implements Processor<String, String, String, String> {
String RETRY_COUNT = "RetryCount";
Integer threshold;
public MessageHeaderProcessor(Integer threshold) {
this.threshold = threshold;
}
@Override
public void init(ProcessorContext context) {
Processor.super.init(context);
}
@SneakyThrows
@Override
public void process(Record record) {
Headers headers = record.headers();
Iterator<Header> retryCountHeader = headers.headers(RETRY_COUNT).iterator();
if (!retryCountHeader.hasNext()) {
headers.add(RETRY_COUNT, "1".getBytes());
} else {
headers.remove(RETRY_COUNT);
var retryCount = extractRetryCount(retryCountHeader.next().value());
var newRetryCount = String.valueOf(retryCount + 1).getBytes();
headers.add(RETRY_COUNT, newRetryCount);
if (retryCount > this.threshold) {
//should skip the message
}
}
}
private int extractRetryCount(final byte[] bytes) {
return Integer.parseInt(new String(bytes));
}
@Override
public void close() {
Processor.super.close();
}
}
public class EventStreamProcessor {
@Autowired
public void streamTopology(StreamsBuilder streamsBuilder) {
KStream<String, String> inputStream = streamsBuilder.stream("inputTopic");
inputStream.process(new MessageHeaderProcessorSupplier(10));
inputStream.to("outputTopic");
}
}
Using a Processor
you emit downstream record via context.forward()
. Thus, you can skip a message by not call forward()
inside process(...)
method.
public class MessageHeaderProcessor implements Processor<String, String, String, String> {
// ... omitted
private ProcessorContext context;
@Override
public void init(ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public void process(Record<String, String> record) {
if (...) { // below threshold
context.forward(record);
}
}
}