I've made a small code for practising with Executors and Threads. It consists of the following:
while(true)
) to the pool (then
all threads are occupied) while
and then doing a
println for seeing how make active task and task count do i have mayInterruptIfRunning=true
and then
doing a println for seeing how make active task and task count do i
haveThis is the code:
public class Main {
private static ThreadPoolExecutor fixexThreadPool;
public static void main(String[] args) throws InterruptedException {
System.out.println("Creating fixed thread pool of size 3 and infinite queue.");
fixexThreadPool = new ThreadPoolExecutor(3, 3, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
final Boolean[] flag = new Boolean[1];
flag[0] = true;
List<Future> futures = new ArrayList<>();
System.out.println("Submiting 3 threads");
for (int i = 0; i < 3; i++) {
futures.add(fixexThreadPool.submit(() -> {
int a = 1;
while (flag[0]) {
a++;
}
System.out.println("Finishing thread execution.");
}));
}
System.out.println("Done submiting 3 threads.");
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
Thread.sleep(3000L);
System.out.println("Submitting a 4th thread.");
futures.add(fixexThreadPool.submit(() -> {
int a = 1;
while (flag[0]) {
a++;
}
System.out.println("Finishing thread execution");
}));
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
System.out.println("Executor shutdown");
fixexThreadPool.shutdown();
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
Thread.sleep(2000L);
System.out.println("Setting flag to false.");
flag[0] = false;
Thread.sleep(2000L);
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
System.out.println("Cancelling all futures.");
futures.forEach(f -> f.cancel(true));
System.out.println(String.format("Active count: %s | Completed task count: %s | task count: %s", fixexThreadPool.getActiveCount(), fixexThreadPool.getCompletedTaskCount(), fixexThreadPool.getTaskCount()));
}
}
This is the output of the execution:
There are a couple of things i don't understand.
while
doesn't break ? Thanks in advance!!
Why, after shutting down executor, there still are active threads?
By invoking shutdown
, we request a normal shutdown. The thread pool stops accepting new tasks but waits for the already submitted tasks to complete - running or queued. So the shutdown method doesn't try to stop the threads. And conceptually, the ExecutorService
implementations separate the task submission from the task execution. In other words, we own the tasks but not the threads.
Why, after changing the flag to false in order to break the infinite loop, the infinite while doesn't break?
We're using a cancellation flag here. But according to the Java memory model, for multiple threads to see the changes made to the shared data - flag
- we must either employ synchronization or use the volatile keyword. So while synchronization provides both mutual exclusion and memory visibility, the volatile keyword just provides the memory visibility. Since only the main thread modifies the flag variable here, defining flag
as a static volatile variable will make it work as expected.
public class Main {
private static ThreadPoolExecutor fixexThreadPool;
private static volatile Boolean[] flag = new Boolean[1];
// the main method
}
Why, after canceling every future, there is are active threads ?
ThreadPoolExecutor
uses the FutureTask
class as the Future
implementation by default. And FutureTask
cancels by interrupting the worker thread. So one can expect thread interruption to stop the task and even terminate the thread. But thread interruption is a cooperative mechanism. Firstly the task must be responsive to interrupts. It must check the interruption flag with Thread.isInterrupted
and exit if possible. Secondly, the thread owner, in our case the thread pool, must check the interruption status and act accordingly. In this example, the task isn't responsive to interrupts. It uses a flag to cancel its operation. So let's continue with the thread owner. ThreadPoolExecutor
doesn't act on the interruption status, so it doesn't terminate the thread.
No matter if a change the flag to false, shutdown executor or even canceling all futures, my program doesn't stop running. Why is that?
As mentioned above, the current task uses the cancellation flag approach. So using the volatile keyword must solve the problem. Then the tasks will stop as intended. Cancellation, on the other hand, won't have any effect on the current task. Because it isn't checking the interruption status. We can make it responsive like this:
while (flag[0] && !Thread.currentThread().isInterrupted())
This way, the task also supports the cancellation by a thread interrupt.