Search code examples
javaspring-kafkaspring-cloud-stream

How to set RecordInterceptor for all the MessageListenerContainers created by spring-kafka


I am using spring-kafka 2.5.4 with spring boot 2.3.2. I have some methods annotated with @StreamListener. I want to add a universal interceptor for all the methods. I have been trying it with @EnableKafka with ConcurrentKafkaListenerContainerFactory bean but it is not working.

I also don't want to configure ConsumerFactory by my own but let spring-kafka take care of it. I just want to build upon the existing configuration and add my RecordInterceptor on top of it.


Solution

  • spring-cloud-stream does not use that container factory, it creates its own containers.

    With spring-cloud-stream, you need to add a ListenerContainerCustomizer bean to modify the container(s).