I have a spring application that use blockingqueue to run producer-consumer design. Basically when user makes an API call using REST controller, it adds a work to blockingqueue and background thread will consume the queue as soon as it's arrived.
I see that Spring recommends to use it's TaskExecutor, so I have the following class. ThreadConfig.java
@Configuration
public class ThreadConfig {
@Bean
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.initialize();
return executor;
}
}
I also have a consumer component of which watches the queue and run the task. MessageConsumer.java
@Component
public class MessageConsumer implements Runnable{
private final BlockingQueue<String> queue;
MessageConsumer(BlockingQueue<String> queue){
this.queue = queue;
}
public void run(){
try {
while (true) {
String str = queue.take();
// Do something
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
And now, I am not sure how to start threadpool when spring application starts.
Do I simply add the code in Main?
Any help would be appreciated. Thank you
I think you are overthinking it. Internally ThreadPoolTaskExecutor
also uses BlockingQueue
to hand work to worker Threads
from the ThreadPool. Since you have not changed default queueCapacity
it will be LinkedBlockingQueue
.
In your scenario it would be easier to make it this way :
Thread
, the producer will be the Thread
that handles HTTP requestThread
from the ThreadPoolTaskExecutor
.So to make the producer-consumer work, using just ThreadPoolTaskExecutor
just create a task and submit it to your ThreadPool and one of the workers will consume it :
@RestController
public class MyController {
private final TaskExecutor taskExecutor;
@Autowired
public MyController(final TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@GetMapping("/test/{value}")
public ResponseEntity<String> get(final @PathVariable("value") String value) {
taskExecutor.execute(() -> {
System.out.println(value);
// do something with your String
// this will be executed by some worker Thread
});
return ResponseEntity.ok(value);
}
}
And also since you are using ThreadPoolTaskExecutor
and it implements DisposableBean
interface (through extending ExecutorConfigurationSupport
) - you do not have to shutdown
the pool explicitly. Spring will do this for you when Spring Context will be destroyed and destroy
method of this interface will be invoked on the pool Bean.