Search code examples

How to get threadlocal for concurrency consumer?

I am developing spring kafka consumer. Due to message volume, I need use concurrency to make sure throughput. Due to used concurrency, I used threadlocal object to save thread based data. Now I need remove this threadlocal object after use it. Spring document with below links suggested to implement a EventListener which listen to event ConsumerStoppedEvent . But did not mention any sample eventlistener code to get threadlocal object and remove the value. May you please let me know how to get the threadlocal instance in this case? Code samples will be appreciated.


  • Something like this:

    public class So71884752Application {
        public static void main(String[] args) {
  , args);
        public NewTopic topic2() {
        static class MyListener implements ApplicationListener<ConsumerStoppedEvent> {
            private static final ThreadLocal<Long> threadLocalState = new ThreadLocal<>();
            @KafkaListener(topics = "topic1", groupId = "my-consumer", concurrency = "2")
            public void listen() {
                long id = Thread.currentThread().getId();
                System.out.println("set thread id to ThreadLocal: " + id);
            public void onApplicationEvent(ConsumerStoppedEvent event) {
                System.out.println("Remove from ThreadLocal: " + threadLocalState.get());

    So, I have two concurrent listener containers for those two partitions in the topic. Each of them is going to call this my @KafkaListener method anyway. I store the thread id into the ThreadLocal. For simple use-case and testing the feature.

    The I implement ApplicationListener<ConsumerStoppedEvent> which is emitted in the appropriate consumer thread. And that one helps me to extract ThreadLocal value and clean it up in the end of consumer life.

    The test against embedded Kafka looks like this:

    @EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
    class So71884752ApplicationTests {
        KafkaTemplate<String, String> kafkaTemplate;
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
        void contextLoads() throws InterruptedException {
            this.kafkaTemplate.send("topic1", "1", "foo");
            this.kafkaTemplate.send("topic1", "2", "bar");
            Thread.sleep(1000); // Give it a chance to consume data

    Right. It doesn't verify anything, but it demonstrate how that event can happen.

    I see something like this in log output:

    set thread id to ThreadLocal: 125
    set thread id to ThreadLocal: 127
    Remove from ThreadLocal: 125
    Remove from ThreadLocal: 127

    So, whatever that doc says is correct.