Search code examples
javaconcurrencyexecutorservicecallable

ExecutorService.awaitTermination() never times out


I'm trying to implement a function where either the callables finish within stipulated time or the operation times out. I had hoped that ExecutorService.awaitTermination() would do this but was surprised to see that it doesn't. The code is below. The run never completes.

public class Counter implements Callable<Void> {

    public static void main(String[] args) throws InterruptedException {
        final Map<String, Counter> map = new HashMap<>();
        map.put("", new Counter());
        final Map<String, Future<Void>> result = executeTasksInParallel(map);
        final Future<Void> voidFuture = result.get("");
        try {
            voidFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Void call() throws Exception {
        for (long i = 0L; i < Long.MAX_VALUE; i++);
        return null;
    }

    public static <K, V> Map<K, Future<V>> executeTasksInParallel(final Map<K, ? extends Callable<V>> callablesById) throws InterruptedException {
        final Map<K, Future<V>> resultFuturesById = new HashMap<>();
        final ExecutorService executorService = Executors.newFixedThreadPool(callablesById.size());
        for (final Map.Entry<K, ? extends Callable<V>> callableByIdEntry : callablesById.entrySet()) {
            final K id = callableByIdEntry.getKey();
            final Callable<V> callable = callableByIdEntry.getValue();
            final Future<V> resultFuture = executorService.submit(callable);
            resultFuturesById.put(id, resultFuture);
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.SECONDS);
        return resultFuturesById;
    }
}

Am I missing something here? Thanks!

UPDATE:

I tried replacing try block content with below to avoid Future.get() from blocking but that didn't help either

if (voidFuture.isDone()) {
   voidFuture.get();
}

Solution

    1. Use shutdownNow() as Joe C has specified...
    2. ...But it will only work if your code in call() allows it to, for example by checking if the current thread is being interruped. See e.g. this question and its answers for details. Occasionally you might go without this "cooperative" behavior in your loop if it calls (directly or indirectly) methods that handle interrupt requests properly by throwing an InterruptedException (examples are Thread.sleep(...), Object.wait(...), Future.get(...), blocking operations on a channel that implements InterruptibleChannel etc). EDIT: ...and if the InterruptedException that is thrown is not suppressed.
    3. And yes, only call get() if the future isDone() (because it's on the main thread not managed by your executorService).

    The final code would be

    public class Counter implements Callable<Void> {
    
        public static void main(String[] args) throws InterruptedException {
            final Map<String, Counter> map = new HashMap<>();
            map.put("", new Counter());
            final Map<String, Future<Void>> result = executeTasksInParallel(map);
            final Future<Void> voidFuture = result.get("");
            try {
                if (voidFuture.isDone()) {
                    voidFuture.get();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public Void call() throws Exception {
            for (long i = 0L; i < Long.MAX_VALUE; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    Thread.currentThread().interrupt(); // restore interrupted flag
                    return null;
                }
                /* or e.g. throw an exception */
            }
            return null;
        }
    
        public static <K, V> Map<K, Future<V>> executeTasksInParallel(
                final Map<K, ? extends Callable<V>> callablesById)
                throws InterruptedException {
            final Map<K, Future<V>> resultFuturesById = new HashMap<>();
            final ExecutorService executorService =
                Executors.newFixedThreadPool(callablesById.size());
            for (final Map.Entry<K, ? extends Callable<V>> callableByIdEntry : callablesById
                .entrySet()) {
                final K id = callableByIdEntry.getKey();
                final Callable<V> callable = callableByIdEntry.getValue();
                final Future<V> resultFuture = executorService.submit(callable);
                resultFuturesById.put(id, resultFuture);
            }
            executorService.shutdown();
            executorService.awaitTermination(5L, TimeUnit.SECONDS);
            executorService.shutdownNow();
            return resultFuturesById;
        }
    }