As native KafkaConsumer is not thread safe, so it is discouraged to call pause and resume methods from different thread instead of kafka-consumer processing thread. but as spring-kafka provides another layer KafkaMessageListenerContainer which internally use kafka-consumer. So my question is can we use KafkaListenerEndpointRegistry to get the listener container by id and call resume or pause method from other thread rather than consumer processing thread.
kafkaListenerEndpointRegistry.getListenerContainer("id").pause();
ExecutorService executorService = newFixedThreadPool(2);
executorService.submit(()->{
System.out.println("CurrentThread: {}" + Thread.currentThread().getId()+ " " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaListenerEndpointRegistry.getListenerContainer("id").resume();
});
Yes; container.pause()
sets a flag to tell the Consumer
thread to pause before its next poll()
call. Similarly, resume()
resets the flag so the consumer thread will resume the Consumer
before the next poll.