Search code examples
spring-kafka

How to get threadlocal for concurrency consumer?


I am developing spring kafka consumer. Due to message volume, I need use concurrency to make sure throughput. Due to used concurrency, I used threadlocal object to save thread based data. Now I need remove this threadlocal object after use it. Spring document with below links suggested to implement a EventListener which listen to event ConsumerStoppedEvent . But did not mention any sample eventlistener code to get threadlocal object and remove the value. May you please let me know how to get the threadlocal instance in this case? Code samples will be appreciated. https://docs.spring.io/spring-kafka/docs/current/reference/html/#thread-safety


Solution

  • Something like this:

    @SpringBootApplication
    public class So71884752Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So71884752Application.class, args);
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("topic1").partitions(2).build();
        }
    
        @Component
        static class MyListener implements ApplicationListener<ConsumerStoppedEvent> {
    
            private static final ThreadLocal<Long> threadLocalState = new ThreadLocal<>();
    
            @KafkaListener(topics = "topic1", groupId = "my-consumer", concurrency = "2")
            public void listen() {
                long id = Thread.currentThread().getId();
                System.out.println("set thread id to ThreadLocal: " + id);
                threadLocalState.set(id);
            }
    
            @Override
            public void onApplicationEvent(ConsumerStoppedEvent event) {
                System.out.println("Remove from ThreadLocal: " + threadLocalState.get());
                threadLocalState.remove();
            }
    
        }
    
    }
    

    So, I have two concurrent listener containers for those two partitions in the topic. Each of them is going to call this my @KafkaListener method anyway. I store the thread id into the ThreadLocal. For simple use-case and testing the feature.

    The I implement ApplicationListener<ConsumerStoppedEvent> which is emitted in the appropriate consumer thread. And that one helps me to extract ThreadLocal value and clean it up in the end of consumer life.

    The test against embedded Kafka looks like this:

    @SpringBootTest
    @EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    @DirtiesContext
    class So71884752ApplicationTests {
    
        @Autowired
        KafkaTemplate<String, String> kafkaTemplate;
    
        @Autowired
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    
        @Test
        void contextLoads() throws InterruptedException {
            this.kafkaTemplate.send("topic1", "1", "foo");
            this.kafkaTemplate.send("topic1", "2", "bar");
            this.kafkaTemplate.flush();
    
            Thread.sleep(1000); // Give it a chance to consume data
    
            this.kafkaListenerEndpointRegistry.stop();
        }
    
    }
    

    Right. It doesn't verify anything, but it demonstrate how that event can happen.

    I see something like this in log output:

    set thread id to ThreadLocal: 125
    set thread id to ThreadLocal: 127
    ...
    Remove from ThreadLocal: 125
    Remove from ThreadLocal: 127
    

    So, whatever that doc says is correct.