Search code examples
javajava-streamjava.util.concurrentthreadpoolexecutor

Java ThreadPoolExexecutor using streams and Callables


I have a class which implements Callable, and it has a method which overrides call and returns a Long.

I create a List of Callable<Long> as

List<Callable<Long>> callables = new ArrayList<>();
for (File fileEntry : folder.listFiles()) {
    callables.add(new DataProcessor(fileEntry));

I have

ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10);

and I call

threadPoolExecutor.invokeAll(callables)
    .stream()
    .map(future -> {
        try {
            return future.get();
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        })
        .collect(Collectors.toLong(/* what goes here? */));

What I want to do is sum all the return values from future.get().

Also, since I am calling the invokeAll, do I still need to do a shutdown on the Executor?


Solution

  • You can use Stream.mapToLong to map the future.get as LongStream and then find the sum of the stream as:

    long sum = threadPoolExecutor.invokeAll(callables)
                .stream()
                .mapToLong(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                }) // LongStream
                .sum(); // sum of the stream
    

    Note: This simplifies stream API call chains using Collectors.summingLong. It allows avoiding the creation of redundant temporary objects when traversing a collection.

    Aside: You can also collect your Callables as :

    List<Callable<Long>> callables = fileList.stream()
                                             .map(fileEntry -> new DataProcessor(fileEntry))
                                             .collect(Collectors.toList());
    

    since I am calling the invokeAll, do I still need to do a shutdown on the Executor?

    Yes, you would have to shut down the ExecutorService. You can also confirm the status of the same using isShutDown() API as :

    System.out.println(threadPoolExecutor.isShutdown()); // would return false