Search code examples
javajava.util.concurrentjava-threads

Program doesn't stop running even though executor was shutdown and futures cancelled


I've made a small code for practising with Executors and Threads. It consists of the following:

  1. Create a fixed-thread pool of size 3 with an infinite queue.
  2. Submit 3 tasks with infinite loop (while(true)) to the pool (then all threads are occupied)
  3. Submit a 4th task, which is going to be waiting in the queue.
  4. executor.shutdown() and doing a println for seeing how make active task and task count do i have.
  5. setting the flag to false in order to stop the infinite while and then doing a println for seeing how make active task and task count do i have
  6. cancelling all futures with mayInterruptIfRunning=true and then doing a println for seeing how make active task and task count do i have

This is the code:

public class Main {


private static ThreadPoolExecutor fixexThreadPool;

public static void main(String[] args) throws InterruptedException {
    System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
    fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    final Boolean[] flag = new Boolean[1];
    flag[0] = true;
    List<Future> futures = new ArrayList<>();

    System.out.println("Submiting 3 threads");
    for (int i = 0; i < 3; i++) {
        futures.add(fixexThreadPool.submit(() -> {
            int a = 1;
            while (flag[0]) {
                a++;
            }
            System.out.println("Finishing thread execution.");
        }));
    }
    System.out.println("Done submiting 3 threads.");
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(3000L);
    System.out.println("Submitting a 4th thread.");

    futures.add(fixexThreadPool.submit(() -> {
        int a = 1;
        while (flag[0]) {
            a++;
        }
        System.out.println("Finishing thread execution");
    }));

    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));

    System.out.println("Executor shutdown");
    fixexThreadPool.shutdown();
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    Thread.sleep(2000L);
    System.out.println("Setting flag to false.");
    flag[0] = false;
    Thread.sleep(2000L);
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
    System.out.println("Cancelling all futures.");
    futures.forEach(f -> f.cancel(true));
    System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}

}

This is the output of the execution:

  • Creating fixed thread pool of size 3 and infinite queue.
  • Submiting 3 threads
  • Done submiting 3 threads.
  • Active count: 3 | Completed task count: 0 | task count: 3
  • Submitting a 4th thread.
  • Active count: 3 | Completed task count: 0 | task count: 4
  • Executor shutdown
  • Active count: 3 | Completed task count: 0 | task count: 4
  • Setting flag to false.
  • Active count: 3 | Completed task count: 0 | task count: 4
  • Cancelling all futures.
  • Active count: 3 | Completed task count: 0 | task count: 4

There are a couple of things i don't understand.

  1. Why, after shutting down executor, there still are active threads ?
  2. Why, after changing the flag to false in order to break the infinite loop, the infinite while doesn't break ?
  3. Why, after cancelling every future, there is are active threads ?
  4. No matter if a change the flag to false, shutdown executor or even cancelling all futures, my program doesn't stop running. Why is that?

Thanks in advance!!


Solution

  • Why, after shutting down executor, there still are active threads?

    By invoking shutdown, we request a normal shutdown. The thread pool stops accepting new tasks but waits for the already submitted tasks to complete - running or queued. So the shutdown method doesn't try to stop the threads. And conceptually, the ExecutorService implementations separate the task submission from the task execution. In other words, we own the tasks but not the threads.

    Why, after changing the flag to false in order to break the infinite loop, the infinite while doesn't break?

    We're using a cancellation flag here. But according to the Java memory model, for multiple threads to see the changes made to the shared data - flag - we must either employ synchronization or use the volatile keyword. So while synchronization provides both mutual exclusion and memory visibility, the volatile keyword just provides the memory visibility. Since only the main thread modifies the flag variable here, defining flag as a static volatile variable will make it work as expected.

    public class Main {
    
    
        private static ThreadPoolExecutor fixexThreadPool;
        private static volatile Boolean[] flag = new Boolean[1];
        // the main method
    }
    

    Why, after canceling every future, there is are active threads ?

    ThreadPoolExecutor uses the FutureTask class as the Future implementation by default. And FutureTask cancels by interrupting the worker thread. So one can expect thread interruption to stop the task and even terminate the thread. But thread interruption is a cooperative mechanism. Firstly the task must be responsive to interrupts. It must check the interruption flag with Thread.isInterrupted and exit if possible. Secondly, the thread owner, in our case the thread pool, must check the interruption status and act accordingly. In this example, the task isn't responsive to interrupts. It uses a flag to cancel its operation. So let's continue with the thread owner. ThreadPoolExecutor doesn't act on the interruption status, so it doesn't terminate the thread.

    No matter if a change the flag to false, shutdown executor or even canceling all futures, my program doesn't stop running. Why is that?

    As mentioned above, the current task uses the cancellation flag approach. So using the volatile keyword must solve the problem. Then the tasks will stop as intended. Cancellation, on the other hand, won't have any effect on the current task. Because it isn't checking the interruption status. We can make it responsive like this:

    while (flag[0] && !Thread.currentThread().isInterrupted())
    

    This way, the task also supports the cancellation by a thread interrupt.