Search code examples
javaasynchronousjava-8completable-future

Parallel processing using collection of CompletableFuture supplyAsync then collecting results


//Unit of logic I want to make it to run in parallel
public PagesDTO convertOCRStreamToDTO(String pageId, Integer pageSequence) throws Exception {
    LOG.info("Get OCR begin for pageId [{}] thread name {}",pageId, Thread.currentThread().getName());
    OcrContent ocrContent = getOcrContent(pageId);
    OcrDTO ocrData = populateOCRData(ocrContent.getInputStream());
    PagesDTO pageDTO = new PagesDTO(pageId, pageSequence.toString(), ocrData);
    return pageDTO; 
}

Logic to execute convertOCRStreamToDTO(..) in parallel then collect its results when individuals thread execution is done

List<PagesDTO> pageDTOList = new ArrayList<>();
//javadoc: Creates a work-stealing thread pool using all available processors as its target parallelism level.
ExecutorService newWorkStealingPool = Executors.newWorkStealingPool(); 
Instant start = Instant.now();
List<CompletableFuture<PagesDTO>> pendingTasks = new ArrayList<>();
List<CompletableFuture<PagesDTO>> completedTasks = new ArrayList<>();
CompletableFuture<<PagesDTO>> task = null;

for (InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    String pageId = dcInputPageDTO.getPageId();
    task = CompletableFuture
            .supplyAsync(() -> {
                try {
                    return convertOCRStreamToDTO(pageId, pageSequence.getAndIncrement());
                } catch (HttpHostConnectException | ConnectTimeoutException e) {
                    LOG.error("Error connecting to Redis for pageId [{}]", pageId, e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.REDIS_CONNECTION_FAILURE),
                            " Connecting to the Redis failed while getting OCR for pageId ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                } catch (CaptureException e) {
                    LOG.error("Error in Document Classification Engine Service while getting OCR for pageId [{}]",pageId,e);
                    exceptionMap.put(pageId,e);
                } catch (Exception e) {
                    LOG.error("Error getting OCR content for the pageId [{}]", pageId,e);
                    CaptureException e1 = new CaptureException(Error.getErrorCodes().get(ErrorCodeConstants.TECHNICAL_FAILURE),
                            "Error while getting ocr content for pageId : ["+pageId +"] " + e.getMessage(), CaptureErrorComponent.REDIS_CACHE, e);
                    exceptionMap.put(pageId,e1);
                }
                return null;
            }, newWorkStealingPool);
    //collect all async tasks
    pendingTasks.add(task);
}

//TODO: How to avoid unnecessary loops which is happening here just for the sake of waiting for the future tasks to complete???
//TODO: Looking for the best solutions
while(pendingTasks.size() > 0) {
    for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
        if(futureTask != null && futureTask.isDone()){
            completedTasks.add(futureTask);
            pageDTOList.add(futureTask.get());
        }
    }
    pendingTasks.removeAll(completedTasks);
}

//Throw the exception cought while getting converting OCR stream to DTO - for any of the pageId
for(InputPageDTO dcInputPageDTO : dcReqDTO.getPages()) {
    if(exceptionMap.containsKey(dcInputPageDTO.getPageId())) {
        CaptureException e = exceptionMap.get(dcInputPageDTO.getPageId());
        throw e;
    }
}

LOG.info("Parallel processing time taken for {} pages = {}", dcReqDTO.getPages().size(),
        org.springframework.util.StringUtils.deleteAny(Duration.between(Instant.now(), start).toString().toLowerCase(), "pt-"));

Please look at my above code base todo items, I have below two concerns for which I am looking for advice over stackoverflow:

1) I want to avoid unnecessary looping (happening in while loop above), what is the best way for optimistically I wait for all threads to complete its async execution then collect my results out of it??? Please anybody has an advice?

2) ExecutorService instance is created at my service bean class level, thinking that, it will be re-used for every requests, instead create it local to the method, and shutdown in finally. Am I doing right here?? or any correction in my thought process?


Solution

  • Simply remove the while and the if and you are good:

    for(CompletableFuture<PagesDTO> futureTask: pendingTasks) {
        completedTasks.add(futureTask);
        pageDTOList.add(futureTask.get());
    }
    

    get() (as well as join()) will wait for the future to complete before returning a value. Also, there is no need to test for null since your list will never contain any.

    You should however probably change the way you handle exceptions. CompletableFuture has a specific mechanism for handling them and rethrowing them when calling get()/join(). You might simply want to wrap your checked exceptions in CompletionException.