I am developing spring kafka consumer. Due to message volume, I need use concurrency to make sure throughput. Due to used concurrency, I used threadlocal object to save thread based data. Now I need remove this threadlocal object after use it. Spring document with below links suggested to implement a EventListener which listen to event ConsumerStoppedEvent . But did not mention any sample eventlistener code to get threadlocal object and remove the value. May you please let me know how to get the threadlocal instance in this case? Code samples will be appreciated. https://docs.spring.io/spring-kafka/docs/current/reference/html/#thread-safety
Something like this:
@SpringBootApplication
public class So71884752Application {
public static void main(String[] args) {
SpringApplication.run(So71884752Application.class, args);
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic1").partitions(2).build();
}
@Component
static class MyListener implements ApplicationListener<ConsumerStoppedEvent> {
private static final ThreadLocal<Long> threadLocalState = new ThreadLocal<>();
@KafkaListener(topics = "topic1", groupId = "my-consumer", concurrency = "2")
public void listen() {
long id = Thread.currentThread().getId();
System.out.println("set thread id to ThreadLocal: " + id);
threadLocalState.set(id);
}
@Override
public void onApplicationEvent(ConsumerStoppedEvent event) {
System.out.println("Remove from ThreadLocal: " + threadLocalState.get());
threadLocalState.remove();
}
}
}
So, I have two concurrent listener containers for those two partitions in the topic. Each of them is going to call this my @KafkaListener
method anyway. I store the thread id into the ThreadLocal
. For simple use-case and testing the feature.
The I implement ApplicationListener<ConsumerStoppedEvent>
which is emitted in the appropriate consumer thread. And that one helps me to extract ThreadLocal
value and clean it up in the end of consumer life.
The test against embedded Kafka looks like this:
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
@DirtiesContext
class So71884752ApplicationTests {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Test
void contextLoads() throws InterruptedException {
this.kafkaTemplate.send("topic1", "1", "foo");
this.kafkaTemplate.send("topic1", "2", "bar");
this.kafkaTemplate.flush();
Thread.sleep(1000); // Give it a chance to consume data
this.kafkaListenerEndpointRegistry.stop();
}
}
Right. It doesn't verify anything, but it demonstrate how that event can happen.
I see something like this in log output:
set thread id to ThreadLocal: 125
set thread id to ThreadLocal: 127
...
Remove from ThreadLocal: 125
Remove from ThreadLocal: 127
So, whatever that doc says is correct.