Search code examples
javaspringapache-kafkaspring-kafka

How do I get a Kafka message before the @KafkaListener method?


My task is to get the Kafka message before the method with the @KafkaListner annotation, check the correlationId and requestId headers in it. If they're present, flush them to MDC or generate them otherwise.

And my question is how to get Kafka message with headers before method with the @KafkaListner?


Solution

  • You can try to write your own ConsumerInterceptor following instructions from here.
    Apache Kafka provides a mechanism to add interceptors to producers and consumers. These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won’t work for wiring in dependent Spring Beans. However, you can manually wire in those dependencies using the interceptor config() method. The following Spring Boot application shows how to do this by overriding boot’s default factories to add some dependent bean into the configuration properties.

    ConsumerFactory definition:

        @Bean
        public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
            Map<String, Object> consumerProperties = new HashMap<>();
            // consumerProperties.put(..., ...)
            // ...
            consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
            consumerProperties.put("some.bean", someBean);
            return new DefaultKafkaConsumerFactory<>(consumerProperties);
        }
    
    

    Interceptor definition:

    public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    
        private SomeBean bean;
    
        @Override
        public void configure(Map<String, ?> configs) {
            this.bean = (SomeBean) configs.get("some.bean");
        }
    
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            this.bean.someMethod("consumer interceptor");
            return records;
        }
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        }
    
        @Override
        public void close() {
        }
    
    }