I have spring-cloud-stream project that use kafka binder. Application consumes messages in batch mode. I need to filter consumed records by specific header. In this case i use BatchInterceptor:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
BatchInterceptor<String, String> customInterceptor
) {
return (((container, destinationName, group) -> {
container.setBatchInterceptor(customInterceptor);
log.info("Container customized");
}));
}
@Bean
public BatchInterceptor<String, String> customInterceptor() {
return (consumerRecords, consumer) -> {
log.info("Origin records count: {}", consumerRecords.count());
final Set<TopicPartition> partitions = consumerRecords.partitions();
final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader
= Stream.of(partitions).flatMap(Collection::stream)
.collect(Collectors.toMap(
Function.identity(),
p -> Stream.ofNullable(consumerRecords.records(p))
.flatMap(Collection::stream)
.filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
.collect(Collectors.toList())
));
var filteredRecords = new ConsumerRecords<>(filteredByHeader);
log.info("Filtered count: {}", filteredRecords.count());
return filteredRecords;
};
}
Example code here batch interceptor example.
In logs i see, that the records are filtered successfully, but the filtered ones are still get into the consumer.
Why ButchInterceptor does not filter records? How can i filter ConsumerRecords by specific header in spring-cloud-stream with enabled batch mode? You can run the tests from the example to reproduce behaveor.
You are using very old code (Boot 2.5.0) which is out of OSS support.
https://spring.io/projects/spring-boot#support
(Cloud too).
I tested your interceptor with current versions and it works fine.
Boot 2.7.5, cloud 2021.0.4:
@SpringBootApplication
public class So74203611Application {
private static final Logger log = LoggerFactory.getLogger(So74203611Application.class);
public static void main(String[] args) {
SpringApplication.run(So74203611Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
BatchInterceptor<String, String> customInterceptor) {
return (((container, destinationName, group) -> {
container.setBatchInterceptor(customInterceptor);
log.info("Container customized {}", destinationName);
}));
}
@Bean
public BatchInterceptor<String, String> customInterceptor() {
return (consumerRecords, consumer) -> {
log.info("Origin records count: {}", consumerRecords.count());
final Set<TopicPartition> partitions = consumerRecords.partitions();
final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader = Stream.of(partitions)
.flatMap(Collection::stream)
.collect(Collectors.toMap(Function.identity(),
p -> Stream.ofNullable(consumerRecords.records(p)).flatMap(Collection::stream)
.filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
.collect(Collectors.toList())));
var filteredRecords = new ConsumerRecords<>(filteredByHeader);
log.info("Filtered count: {}", filteredRecords.count());
return filteredRecords;
};
}
@Bean
Consumer<List<String>> input() {
return str -> {
System.out.println(str);
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add("TEST", "foo".getBytes());
ProducerRecord<byte[], byte[]> rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "bar".getBytes(),
headers);
template.send(rec);
headers = new RecordHeaders();
rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "baz".getBytes(), headers);
template.send(rec);
template.send(rec);
};
}
}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
[bar]