I'd like to centralize all the @KafkaTemplate message logging, when a message arrives, when the processing success, and when an exception is thrown by a @KafkaListener
I don't want to use AOP, due to the performance impact, i would prefer to use a RecordInterceptor like this: Log the exceptions thrown in spring kafka listener
I'm using the KafkaAutoConfiguration (all configured via properties), so the auto configuration is creating a ConcurrentKafkaListenerContainerFactory bean without a RecordInterceptor configured
I thought that having a RecordInterceptor bean in the spring context would be enough, but it's not the case, it doesn't intercept anything (the @KafkaListener keep receiveing messages correctly)
@Slf4j
@Component
public class LoggingRecordInterceptor implements RecordInterceptor<String, Object> {
@Override
public ConsumerRecord<String, Object> intercept(ConsumerRecord record) {
log.info("sample logging");
return null;
}
@Override
public ConsumerRecord<String, Object> intercept(ConsumerRecord record, Consumer consumer) {
log.info("sample logging");
return RecordInterceptor.super.intercept(record, consumer);
}
@Override
public void success(ConsumerRecord record, Consumer consumer) {
log.info("sample logging");
RecordInterceptor.super.success(record, consumer);
}
@Override
public void failure(ConsumerRecord record, Exception exception, Consumer consumer) {
log.info("sample logging");
RecordInterceptor.super.failure(record, exception, consumer);
}
}
What am i missing? How can i configure it using KafkaAutoConfiguration?
Thanks in advance
Solved. I think it's a Spring bug.
KafkaAnnotationDrivenConfiguration constructor has the following signature:
KafkaAnnotationDrivenConfiguration(...,
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {}
My LoggingRecordInterceptor
was implementing RecordInterceptor<String, Object>
I thought it was supposed to work (since String obviously extends Object), but actually spring was not able to inject it correctly.
So i changed my LoggingRecordInterceptor
as the following, and now works fine.
But i think it's a spring bug, it should handle generics, since the RecordInterceptor interface suggests it.
@Slf4j
@Component
public class LoggingRecordInterceptor implements RecordInterceptor<Object, Object> {
@Override
public ConsumerRecord<Object, Object> intercept(@NonNull ConsumerRecord<Object, Object> consumerRecord) {
log.info("Topic: {} - Message received - Payload: {}",
consumerRecord.topic(), consumerRecord.value());
return consumerRecord;
}
@Override
public void success(@NonNull ConsumerRecord<Object, Object> consumerRecord,
@NonNull Consumer<Object, Object> consumer) {
log.info("Topic: {} - Message processed - Payload: {}",
consumerRecord.topic(), consumerRecord.value());
RecordInterceptor.super.success(consumerRecord, consumer);
}
@Override
public void failure(@NonNull ConsumerRecord<Object, Object> consumerRecord,
@NonNull Exception exception,
@NonNull Consumer<Object, Object> consumer) {
log.error("Topic: {} - Error processing a message - Payload: {}",
consumerRecord.topic(), consumerRecord.value());
RecordInterceptor.super.failure(consumerRecord, exception, consumer);
}
}