Search code examples
javamultithreadingconcurrency

Why runnable task can't be interrupted with AtomicBoolean flag?


I have next task

public class QuittableTask implements Runnable {
  final int id;

  public QuittableTask(int id) {
    this.id = id;
  }

  private AtomicBoolean running = new AtomicBoolean(true);

  public void quit() {
  System.out.println("Quite " + Thread.currentThread().getName());
     running.set(false);
  }

  @Override
   public void run() {
    while (running.get()) {        
        System.out.println(Thread.currentThread().getName());
        new Nap(0.1);
    }
    System.out.print(id + " ");  
   }

I run a lot of tasks using

public class QuittingTasks {
  public static final int COUNT = 1500;

public static void main(String[] args) throws InterruptedException {
    ExecutorService es = Executors.newCachedThreadPool();
    List<QuittableTask> tasks =
            IntStream.range(1, COUNT)
                    .mapToObj(QuittableTask::new)
                    .peek(es::execute)
                    .collect(Collectors.toList());
    Thread.currentThread().sleep(1000);
    tasks.forEach(QuittableTask::quit);
    es.shutdown();
}

When i use COUNT < 10000 everything is ok. All threads are being terminated. Programm finishes in 2 seconds.

But when i use COUNT > 100001 method quite can't be reached and my programm goes to infinite loop. I don't understand such behaviour. Could somebody explain it to me?


Solution

  • If you look closely on logs (adding some logs on your code) you can see that when the program is generating OutOfMemoryError. And the error looks like below:

    java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
        at java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:372)
        at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
        at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
        at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at com.java8.demo.Main.main(Main.java:58)
    

    That means you are getting OutOfMemory when you try to call quit on all the task but as the error occurs it never gets executed, as a result of your boolean flag never set to false. And other thread keeps running.

    You can notice that out of memory occurs as VM unable to create the native thread.Now you are using Executors.newCachedThreadPool();. So your thread pools max size can be Integer.MAX_VALUE but before reaching that value your VM unable to create new thread. I have changed the executor to FixTheradPool like ExecutorService es = Executors.newFixedThreadPool(8);. It works fine with your higher COUNT value as it will create only 8 thread.

    I have put a try-catch block to trace the exception like below. Also, I forcefully exit the VM by calling System#exit to trace the logs on my console.

    try {
        ExecutorService es = Executors.newCachedThreadPool();
        List<QuittableTask> tasks = IntStream.range(1, COUNT).mapToObj(QuittableTask::new).peek(es::execute)
                .collect(Collectors.toList());
        Thread.currentThread().sleep(1000);
        tasks.forEach(QuittableTask::quit);
        es.shutdown();
    } catch (Throwable e) {
        System.out.println("Error" + e.getMessage());
        e.printStackTrace();
        System.exit(0);
    }