Search code examples
javaconcurrencyexecutor

ExecutorService.invokeAll and shutdown


So I have some Callable tasks, sensitive to interruptions, which I submit to the ExecutorService using invokeAll. After 5 seconds from another method I call executorService.shutdownNow after which I call the awaitTermination, which returns true, so all seems good. The problem is the executor never terminates.

Due to logging I know that each one of my tasks finished. nevertheless the invokeAll still blocks on f.get when i is equal to the number of threads of the executor:

The following code is obtained from AbstractExecutorService + some logging.

        @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        if (tasks == null) throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            List<Callable<T>> list = new ArrayList<Callable<T>>();
            for (Callable<T> t : tasks) {
                list.add(t);
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    log.info("Future %s is not done!. Task %s", i, list.get(i));
                    try {
                        log.info("Get from future %s", i);
                        // NEXT LINE BLOCKS FOR i= NUMBER OF THREADS
                        f.get();
                        log.info("Got result from future %s", i);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            log.info("Obtained all!");
            done = true;
            return futures;
        } finally {
            if (!done) for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

Am I not suppose to use invokeAll with shutdown? I guess not, after all they are in the same class. Why does it get blocked, only when i= the number of threads of the executor?


Solution

  • Yes, you're not suppose to use invokeAll with shutdown. At least this is what I understand, correct me if I'm wrong.

    • The shutdownNow method:
    public List<Runnable> shutdownNow() {
    ...
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
    ...
    }
    

    The only thing is does is interrupt working threads and remove the rest of the runnables from the working queue, see drainQueue. ShutdownNow/Shutdown does not modify the futures in our invokeAll method

    So what happens in my case is that for an Executor with N threads, I invoke 300 jobs, each of them take more than 1 minute, after 5 seconds I cancel (interrupt working threads), N threads are interrupted (0 to N-1). What happens with the rest of the futures? Nothing, the next call to f.get() (see corresponding line in the question) will block and you're stuck there. This explains why I'm always blocked on i = Number of threads.