Search code examples
javamultithreadingjava-21project-loom

ExecutionCompletionService hangs when used with Project loom


I am trying to use Project Loom to enable invoking a REST server in parallel. In the code snippet below, when I use virtual threads the executorService hangs, but I do not see the same behavior with traditional system threads. I have around 50000 API calls to be made, and are processed in chunks with size of 500.

After getting around 490+ responses using take() method, ExecutorService gets stuck/hangs (but with lower chunk size, it doesn't hang).

What could be the reason and why it's failing?

public List<ApiResponseWrapper> sendRequestToApi(List<ApiRequest> apiRequests) throws ExecutionException, InterruptedException {
    ThreadFactory factory = Thread.ofVirtual().factory();
    List<ApiResponseWrapper> apiResponseWrappers = new ArrayList<>();
    final ExecutorService pool = Executors.newFixedThreadPool(7, factory);
    final CompletionService<ApiEntityResponseWrapper> service = new ExecutorCompletionService<>(pool);
    List<RestTemplateClient> webClientCallables = apiRequests.stream().map(ApiRequest -> new RestTemplateClient(ApiRequest,restTemplate)).toList();
    webClientCallables.forEach(service::submit);
    pool.shutdown();
    int i = 0;
    try{
        for(RestTemplateClient webClient : webClientCallables){
            ApiResponseWrapper apiEntityResponseWrapper = service.take().get();
            apiResponseWrappers.add(apiEntityResponseWrapper);
            log.info("The size in webclient callables are reduced by {}", webClientCallables.size()-i);
            i++;
        }
    }
    catch (InterruptedException | ExecutionException | TibcoClientException ee){
        log.error("An exception occurred while processing requests towards Tibco ", ee.getCause());
        throw ee;
    }
    return apiResponseWrappers;
}

Solution

  • Comments provided by @Holger is indeed correct and well supported. Summarizing the approach I took based on his/her comments.

    1. Did away with CompletionService for invokeAll.
    2. Used system threads instead of virtual.
    3. Introduced Semaphores as it is more aligned to what I intend to achieve.