Search code examples
javamultithreadingthreadpoolexecutorserviceshutdown

how to override executorService shutdown method


I am creating my own thread pool and future object which can execute callable interface parallel. Executor provides shutdown method to stop all worker threads from running. If i am creating a thread pool like below how should I implement the shutdown method to stop after all threads have finished execution?

My custom thread pool looks like this

class MyThreadPool implements java.util.concurrent.Executor 
{
    private final java.util.concurrent.BlockingQueue<Callable> queue;

    public MyThreadPool(int numThreads) {
        queue = new java.util.concurrent.LinkedBlockingQueue<>();
        for (int i=0 ; i<numThreads ; i++) {
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true) {
                        queue.take().call();
                    }
                }
            }).start();
        }
    }

  @Override
  public <T> Future<T> submit(Callable<T> callable) {
    FutureTask<T> future = new FutureTask(callable);
    queue.put(future);
    return future;
  }

  public void shutdown(){ }
}

I couldnt think of a way to keep list of thread and then check if they are idle or not?


Solution

  • You definitely should hold references to the threads you're creating. For instance, set up a field threads of type List<Thread> and add the threads to this list from within the constructor.

    Afterwards, you could implement shutdown() with the help of Thread#join():

    public void shutdown() {
        for (Thread t : threads) {
            try {
                t.join();
            } catch (InterruptedException e) { /* NOP */ }
        }
    }
    

    Don't forget to replace while (true) with an appropriate condition (which you toggle in shutdown()) and consider using BlockingQueue#poll(long, TimeUnit) rather than take().

    EDIT: Something like:

    public class MyThreadPool implements Executor {
    
        private List<Thread> threads = new ArrayList<>();
        private BlockingDeque<Callable> tasks = new LinkedBlockingDeque<>();
        private volatile boolean running = true;
    
        public MyThreadPool(int numberOfThreads) {
            for (int i = 0; i < numberOfThreads; i++) {
                Thread t = new Thread(() -> {
                    while (running) {
                        try {
                            Callable c = tasks.poll(5L, TimeUnit.SECONDS);
                            if (c != null) {
                                c.call();
                            }
                        } catch (Exception e) { /* NOP */ }
                    }
                });
                t.start();
                threads.add(t);
            }
        }
    
        public void shutdown() {
            running = false;
            for (Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) { /* NOP */ }
            }
        }
    
        // ...
    
    }