Search code examples
javaapache-kafkaspring-kafkaapache-kafka-streams

How to skip the message according to the value in the message header in Kafka Streams?


I want to skip messages that exceed the threshold value in the message headers. We can access the message headers in the streams using the KafkaStream#process method. But, I don't know how to skip messages that exceed the threshold value in the header. I couldn't use the filter because it couldn't access the header in the KStream#filter() method.

I'm trying to write in java using Spring Kafka Stream. Is there a way to do that?

public class MessageHeaderProcessor implements Processor<String, String, String, String> {

    String RETRY_COUNT = "RetryCount";
    Integer threshold;

    public MessageHeaderProcessor(Integer threshold) {
        this.threshold = threshold;
    }

    @Override
    public void init(ProcessorContext context) {
        Processor.super.init(context);
    }

    @SneakyThrows
    @Override
    public void process(Record record) {
        Headers headers = record.headers();
        Iterator<Header> retryCountHeader = headers.headers(RETRY_COUNT).iterator();
        if (!retryCountHeader.hasNext()) {
            headers.add(RETRY_COUNT, "1".getBytes());
        } else {
            headers.remove(RETRY_COUNT);
            var retryCount = extractRetryCount(retryCountHeader.next().value());
            var newRetryCount = String.valueOf(retryCount + 1).getBytes();
            headers.add(RETRY_COUNT, newRetryCount);
            if (retryCount > this.threshold) {
              //should skip the message                
            }
        }
    }

    private int extractRetryCount(final byte[] bytes) {
        return Integer.parseInt(new String(bytes));
    }

    @Override
    public void close() {
        Processor.super.close();
    }
}


public class EventStreamProcessor {
  
  @Autowired
  public void streamTopology(StreamsBuilder streamsBuilder) {
      KStream<String, String> inputStream = streamsBuilder.stream("inputTopic");
      inputStream.process(new MessageHeaderProcessorSupplier(10));
      inputStream.to("outputTopic");
  }
}

Solution

  • Using a Processor you emit downstream record via context.forward(). Thus, you can skip a message by not call forward() inside process(...) method.

    public class MessageHeaderProcessor implements Processor<String, String, String, String> {
      // ... omitted
    
      private ProcessorContext context;
    
      @Override
      public void init(ProcessorContext<String, String> context) {
        this.context = context;
      }
    
      @Override
      public void process(Record<String, String> record) {
        if (...) { // below threshold
          context.forward(record);
        }
      }  
    }