Search code examples
javaspringspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring-Kafka BatchInterceptor don't work as expected


I have spring-cloud-stream project that use kafka binder. Application consumes messages in batch mode. I need to filter consumed records by specific header. In this case i use BatchInterceptor:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
    BatchInterceptor<String, String> customInterceptor
) {
    return (((container, destinationName, group) -> {
        container.setBatchInterceptor(customInterceptor);
        log.info("Container customized");
    }));
}

@Bean
public BatchInterceptor<String, String> customInterceptor() {
    return (consumerRecords, consumer) -> {
        log.info("Origin records count: {}", consumerRecords.count());
        final Set<TopicPartition> partitions = consumerRecords.partitions();
        final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader
            = Stream.of(partitions).flatMap(Collection::stream)
            .collect(Collectors.toMap(
                Function.identity(),
                p -> Stream.ofNullable(consumerRecords.records(p))
                    .flatMap(Collection::stream)
                    .filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
                    .collect(Collectors.toList())
            ));
        var filteredRecords = new ConsumerRecords<>(filteredByHeader);
        log.info("Filtered count: {}", filteredRecords.count());
        return filteredRecords;
    };
}

Example code here batch interceptor example.

In logs i see, that the records are filtered successfully, but the filtered ones are still get into the consumer.

Why ButchInterceptor does not filter records? How can i filter ConsumerRecords by specific header in spring-cloud-stream with enabled batch mode? You can run the tests from the example to reproduce behaveor.


Solution

  • You are using very old code (Boot 2.5.0) which is out of OSS support.

    https://spring.io/projects/spring-boot#support

    (Cloud too).

    I tested your interceptor with current versions and it works fine.

    Boot 2.7.5, cloud 2021.0.4:

    @SpringBootApplication
    public class So74203611Application {
    
        private static final Logger log = LoggerFactory.getLogger(So74203611Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So74203611Application.class, args);
        }
    
        @Bean
        public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
                BatchInterceptor<String, String> customInterceptor) {
    
            return (((container, destinationName, group) -> {
                container.setBatchInterceptor(customInterceptor);
                log.info("Container customized {}", destinationName);
            }));
        }
    
        @Bean
        public BatchInterceptor<String, String> customInterceptor() {
            return (consumerRecords, consumer) -> {
                log.info("Origin records count: {}", consumerRecords.count());
                final Set<TopicPartition> partitions = consumerRecords.partitions();
                final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader = Stream.of(partitions)
                        .flatMap(Collection::stream)
                        .collect(Collectors.toMap(Function.identity(),
                                p -> Stream.ofNullable(consumerRecords.records(p)).flatMap(Collection::stream)
                                        .filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
                                        .collect(Collectors.toList())));
                var filteredRecords = new ConsumerRecords<>(filteredByHeader);
                log.info("Filtered count: {}", filteredRecords.count());
                return filteredRecords;
            };
        }
    
        @Bean
        Consumer<List<String>> input() {
            return str -> {
                System.out.println(str);
            };
        }
    
        @Bean
        ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
            return args -> {
                Headers headers = new RecordHeaders();
                headers.add("TEST", "foo".getBytes());
                ProducerRecord<byte[], byte[]> rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "bar".getBytes(),
                        headers);
                template.send(rec);
                headers = new RecordHeaders();
                rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "baz".getBytes(), headers);
                template.send(rec);
                template.send(rec);
            };
        }
    
    }
    
    spring.cloud.stream.bindings.input-in-0.group=foo
    spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
    
    [bar]