Search code examples
springmultithreadingblockingqueue

blockingqueue and Spring - how do I start thread pool at startup?


I have a spring application that use blockingqueue to run producer-consumer design. Basically when user makes an API call using REST controller, it adds a work to blockingqueue and background thread will consume the queue as soon as it's arrived.

I see that Spring recommends to use it's TaskExecutor, so I have the following class. ThreadConfig.java

@Configuration
public class ThreadConfig {

  @Bean
  public TaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setThreadNamePrefix("default_task_executor_thread");
    executor.initialize();
    return executor;
  }

}

I also have a consumer component of which watches the queue and run the task. MessageConsumer.java

@Component
public class MessageConsumer implements Runnable{


  private final BlockingQueue<String> queue;

  MessageConsumer(BlockingQueue<String> queue){
    this.queue = queue;
  }

  public void run(){
    try {
      while (true) {
        String str = queue.take();
        // Do something
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

}

And now, I am not sure how to start threadpool when spring application starts.

Do I simply add the code in Main?

Any help would be appreciated. Thank you


Solution

  • I think you are overthinking it. Internally ThreadPoolTaskExecutor also uses BlockingQueue to hand work to worker Threads from the ThreadPool. Since you have not changed default queueCapacity it will be LinkedBlockingQueue.

    In your scenario it would be easier to make it this way :

    • since every HTTP request is handled by separate Thread, the producer will be the Thread that handles HTTP request
    • the consumer will be the worker Thread from the ThreadPoolTaskExecutor.

    So to make the producer-consumer work, using just ThreadPoolTaskExecutor just create a task and submit it to your ThreadPool and one of the workers will consume it :

    @RestController
    public class MyController {
    
        private final TaskExecutor taskExecutor;
    
        @Autowired
        public MyController(final TaskExecutor taskExecutor) {
            this.taskExecutor = taskExecutor;
        }
    
        @GetMapping("/test/{value}")
        public ResponseEntity<String> get(final @PathVariable("value") String value) {
            taskExecutor.execute(() -> {
    
                System.out.println(value);
                // do something with your String
                // this will be executed by some worker Thread
            });
    
            return ResponseEntity.ok(value);
        }
    }
    

    And also since you are using ThreadPoolTaskExecutor and it implements DisposableBean interface (through extending ExecutorConfigurationSupport) - you do not have to shutdown the pool explicitly. Spring will do this for you when Spring Context will be destroyed and destroy method of this interface will be invoked on the pool Bean.