Search code examples
springapache-kafkaslf4jspring-kafkamdc

Hooks in Kafka Listener


Is there any sort of hooks available before / after kafka listen a message ?

Use case : MDC co-relation id has to be set for to perform the log traceability

What I am looking for ? A before/after call back method so that MDC co-relation id can be set on entry and eventually clean MDC upon exit.

Edited Scenario: I am getting co-relation id as a part of Kafka Headers and I want to set the same in MDC as soon as I receive a message in Kafka Listener

Appreciated for help


Solution

  • You can add an around advice to your listener bean...

    @SpringBootApplication
    public class So59854374Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59854374Application.class, args);
        }
    
        @Bean
        public static BeanPostProcessor bpp() { // static is important
            return new BeanPostProcessor() {
    
                @Override
                public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                    if (bean instanceof MyListener) {
                        ProxyFactoryBean pfb = new ProxyFactoryBean();
                        pfb.setTarget(bean);
                        pfb.addAdvice(new MethodInterceptor() {
    
                            @Override
                            public Object invoke(MethodInvocation invocation) throws Throwable {
                                try {
                                    System.out.println("Before");
                                    return invocation.proceed();
                                }
                                finally {
                                    System.out.println("After");
                                }
                            }
    
                        });
                        return pfb.getObject();
                    }
                    return bean;
                }
    
            };
        }
    
        @Bean
        public NewTopic topic() {
            return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> template.send("so59854374", "foo");
        }
    
    }
    
    @Component
    class MyListener {
    
        @KafkaListener(id = "so59854374", topics = "so59854374")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    

    and

    Before
    foo
    After
    

    EDIT

    If you add @Header("myMdcHeader") byte[] mdc as an additional parameter to your kafka listener method, you can use getArguments()[1] on the invocation.

    Another solution is to add a RecordInterceptor to the listener container factory, which allows you to access the raw ConsumerRecord before it is passed to the listener adapter.

    /**
     * An interceptor for {@link ConsumerRecord} invoked by the listener
     * container before invoking the listener.
     *
     * @param <K> the key type.
     * @param <V> the value type.
     *
     * @author Gary Russell
     * @since 2.2.7
     *
     */
    @FunctionalInterface
    public interface RecordInterceptor<K, V> {
    
        /**
         * Perform some action on the record or return a different one.
         * If null is returned the record will be skipped.
         * @param record the record.
         * @return the record or null.
         */
        @Nullable
        ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
    
    }
    
    /**
     * Set an interceptor to be called before calling the listener.
     * Does not apply to batch listeners.
     * @param recordInterceptor the interceptor.
     * @since 2.2.7
     */
    public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
        this.recordInterceptor = recordInterceptor;
    }
    

    If you are using a batch listener, Kafka provides a ConsumerInterceptor.