Search code examples
javamultithreadingconcurrencyexecutorservicejava.util.concurrent

ExecutorCompletionService? Why do need one if we have invokeAll?


If we use an ExecutorCompletionService we can submit a series of tasks as Callables and get the result interacting with the CompletionService as a queue.

But there is also the invokeAll of ExecutorService that accepts a Collection of tasks and we get a list of Future to retrieve the results.

As far as I can tell, there is no benefit in using one or over the other (except that we avoid a for loop using an invokeAll that we would have to submit the tasks to the CompletionService) and essentially they are the same idea with a slight difference.

So why are there 2 different ways to submit a series of tasks? Am I correct that performance wise they are equivalent? Is there a case that one is more suitable than the other? I can't think of one.


Solution

  • Using a ExecutorCompletionService.poll/take, you are receiving the Futures as they finish, in completion order (more or less). Using ExecutorService.invokeAll, you do not have this power; you either block until are all completed, or you specify a timeout after which the incomplete are cancelled.


    static class SleepingCallable implements Callable<String> {
    
      final String name;
      final long period;
    
      SleepingCallable(final String name, final long period) {
        this.name = name;
        this.period = period;
      }
    
      public String call() {
        try {
          Thread.sleep(period);
        } catch (InterruptedException ex) { }
        return name;
      }
    }
    

    Now, below I will demonstrate how invokeAll works:

    final ExecutorService pool = Executors.newFixedThreadPool(2);
    final List<? extends Callable<String>> callables = Arrays.asList(
        new SleepingCallable("quick", 500),
        new SleepingCallable("slow", 5000));
    try {
      for (final Future<String> future : pool.invokeAll(callables)) {
        System.out.println(future.get());
      }
    } catch (ExecutionException | InterruptedException ex) { }
    pool.shutdown();
    

    This produces the following output:

    C:\dev\scrap>java CompletionExample
    ... after 5 s ...
    quick
    slow
    

    Using CompletionService, we see a different output:

    final ExecutorService pool = Executors.newFixedThreadPool(2);
    final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
    final List<? extends Callable<String>> callables = Arrays.asList(
        new SleepingCallable("slow", 5000),
        new SleepingCallable("quick", 500));
    for (final Callable<String> callable : callables) {
      service.submit(callable);
    }
    pool.shutdown();
    try {
      Future<String> future;
      do {
        future = pool.isTerminated() ? service.poll() : service.take();
        if (future != null) {
          System.out.println(future.get());
        }
      } while (future != null);
    } catch (ExecutionException | InterruptedException ex) { }
    

    This produces the following output:

    C:\dev\scrap>java CompletionExample
    ... after 500 ms ...
    quick
    ... after 5 s ...
    slow
    

    Note the times are relative to program start, not the previous message.


    Test out a working example on Replit.