Search code examples
spring-bootspring-kafka

Spring Kafka - Centralized @KafkaListener logging


I'd like to centralize all the @KafkaTemplate message logging, when a message arrives, when the processing success, and when an exception is thrown by a @KafkaListener

I don't want to use AOP, due to the performance impact, i would prefer to use a RecordInterceptor like this: Log the exceptions thrown in spring kafka listener

I'm using the KafkaAutoConfiguration (all configured via properties), so the auto configuration is creating a ConcurrentKafkaListenerContainerFactory bean without a RecordInterceptor configured

I thought that having a RecordInterceptor bean in the spring context would be enough, but it's not the case, it doesn't intercept anything (the @KafkaListener keep receiveing messages correctly)

@Slf4j
@Component
public class LoggingRecordInterceptor implements RecordInterceptor<String, Object> {

    @Override
    public ConsumerRecord<String, Object> intercept(ConsumerRecord record) {
        log.info("sample logging");
        return null;
    }

    @Override
    public ConsumerRecord<String, Object> intercept(ConsumerRecord record, Consumer consumer) {
        log.info("sample logging");
        return RecordInterceptor.super.intercept(record, consumer);
    }

    @Override
    public void success(ConsumerRecord record, Consumer consumer) {
        log.info("sample logging");
        RecordInterceptor.super.success(record, consumer);
    }

    @Override
    public void failure(ConsumerRecord record, Exception exception, Consumer consumer) {
        log.info("sample logging");
        RecordInterceptor.super.failure(record, exception, consumer);
    }
}

What am i missing? How can i configure it using KafkaAutoConfiguration?

Thanks in advance


Solution

  • Solved. I think it's a Spring bug.

    KafkaAnnotationDrivenConfiguration constructor has the following signature:

    KafkaAnnotationDrivenConfiguration(...,
                ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {}
    

    My LoggingRecordInterceptor was implementing RecordInterceptor<String, Object>

    I thought it was supposed to work (since String obviously extends Object), but actually spring was not able to inject it correctly.

    So i changed my LoggingRecordInterceptor as the following, and now works fine.

    But i think it's a spring bug, it should handle generics, since the RecordInterceptor interface suggests it.

    @Slf4j
    @Component
    public class LoggingRecordInterceptor implements RecordInterceptor<Object, Object> {
    
        @Override
        public ConsumerRecord<Object, Object> intercept(@NonNull ConsumerRecord<Object, Object> consumerRecord) {
            log.info("Topic: {} - Message received - Payload: {}",
                    consumerRecord.topic(), consumerRecord.value());
            return consumerRecord;
        }
    
        @Override
        public void success(@NonNull ConsumerRecord<Object, Object> consumerRecord,
                            @NonNull Consumer<Object, Object> consumer) {
            log.info("Topic: {} - Message processed - Payload: {}",
                    consumerRecord.topic(), consumerRecord.value());
            RecordInterceptor.super.success(consumerRecord, consumer);
        }
    
        @Override
        public void failure(@NonNull ConsumerRecord<Object, Object> consumerRecord,
                            @NonNull Exception exception,
                            @NonNull Consumer<Object, Object> consumer) {
            log.error("Topic: {} - Error processing a message - Payload: {}",
                    consumerRecord.topic(), consumerRecord.value());
            RecordInterceptor.super.failure(consumerRecord, exception, consumer);
        }
    }