Hooks in Kafka Listener

Is there any sort of hooks available before / after kafka listen a message ?

Use case : MDC co-relation id has to be set for to perform the log traceability

What I am looking for ? A before/after call back method so that MDC co-relation id can be set on entry and eventually clean MDC upon exit.

Edited Scenario: I am getting co-relation id as a part of Kafka Headers and I want to set the same in MDC as soon as I receive a message in Kafka Listener

Appreciated for help


  • You can add an around advice to your listener bean...

    public class So59854374Application {
        public static void main(String[] args) {
  , args);
        public static BeanPostProcessor bpp() { // static is important
            return new BeanPostProcessor() {
                public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                    if (bean instanceof MyListener) {
                        ProxyFactoryBean pfb = new ProxyFactoryBean();
                        pfb.addAdvice(new MethodInterceptor() {
                            public Object invoke(MethodInvocation invocation) throws Throwable {
                                try {
                                    return invocation.proceed();
                                finally {
                        return pfb.getObject();
                    return bean;
        public NewTopic topic() {
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> template.send("so59854374", "foo");
    class MyListener {
        @KafkaListener(id = "so59854374", topics = "so59854374")
        public void listen(String in) {




    If you add @Header("myMdcHeader") byte[] mdc as an additional parameter to your kafka listener method, you can use getArguments()[1] on the invocation.

    Another solution is to add a RecordInterceptor to the listener container factory, which allows you to access the raw ConsumerRecord before it is passed to the listener adapter.

     * An interceptor for {@link ConsumerRecord} invoked by the listener
     * container before invoking the listener.
     * @param <K> the key type.
     * @param <V> the value type.
     * @author Gary Russell
     * @since 2.2.7
    public interface RecordInterceptor<K, V> {
         * Perform some action on the record or return a different one.
         * If null is returned the record will be skipped.
         * @param record the record.
         * @return the record or null.
        ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
     * Set an interceptor to be called before calling the listener.
     * Does not apply to batch listeners.
     * @param recordInterceptor the interceptor.
     * @since 2.2.7
    public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
        this.recordInterceptor = recordInterceptor;

    If you are using a batch listener, Kafka provides a ConsumerInterceptor.