Search code examples
javamultithreadingthreadpool

How to implement thread pool without ExecutorService


I am trying to work on some understanding on thread pools and am trying to implement one by myself without using the ExecutorService. But getting into waiting state for some reason and I am not sure what is causing it. Here is the implementation that I have.

I have a Pool class which is responsible for creating the worker threads and keeping it ready for jobs that are coming in. I will initialize those in my constructor. Also add those to the queue of available threads.availableQueue this is a BlockingQueue<Executor> of Executor which is an inner class inside the Pool class

    private void create_workers(int size) {
            for (int i = 0; i< size; i++){
                executors[i] = new Executor("Executor :: " + i);
                availableQueue.offer(executors[i]);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

The client code will call get the executor and invoke the job like this

Pool threadPool = new Pool(5);
        for (int i=0;i<10;i++){
            threadPool.execute(new Job("Job : " + i));
        }

Job class is a simple Runnable class to mimic a job

execute method will add the job to its jobQueue and wait if any executors are available or not, if its available then get the executor and invoke the job to execute it. After completion it will put a new Executor in the available queue. Reason for this is after the completion the thread is going into TERMINATED state and could not get access to the same thread. Not sure if there is a way to repurpose the same thread.. Help needed here if it's possible.

public void execute(Job job){
        LOGGER.log(Level.INFO, "Job added to the queue :: ");
        jobQueue.offer(job);

        while(Pool.isExecutorsAvailable()){
            Executor t = getExecutor();
            if (t.getState().name().equals("NEW")){
                t.start();
            }

            wait_for_completion(t);
            availableQueue.offer(new Executor(t.name));
        }

}

The actual problem is that after the first job the code is going on infinite waiting state at while(Pool.isExecutorsAvailable()) I could not identify the problem or not sure whats causing the issue.

The code for isExecutorsAvailable is here

   public static boolean isExecutorsAvailable(){
        if(jobQueue.isEmpty()) {
            try {
                LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
                executorLock.wait();
            }
            catch (IllegalMonitorStateException e) {            }
            catch (InterruptedException e) {}
        }
//        executorLock.notify();
        return true;
    }

Any help on this is much appreciated

Edit:

Added all the code snippets

//Client

package com.java.pool;

public class Client {

    public static void main(String[] args) {
        Client m = new Client();
        Pool threadPool = new Pool(5);
        for (int i=0;i<10;i++){
            threadPool.execute(new Job("Job : " + i));
        }
    }

}

//Job
package com.java.pool;


import java.text.SimpleDateFormat;
import java.util.*;


public class Job implements Runnable{

    String name;

    public Job(String name){
        this.name = name;
    }

    public void print(String message){
        System.out.println(message);
    }

    public void run(){
        String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss").format(new java.util.Date());
        print(Thread.currentThread().getName() + " Executor picked the job at : " + timeStamp);
        work_with_time();
        print("Job " + this.name + " Completed at : " + timeStamp);
    }

    private void work_with_time() {
        Random r = new Random();
        int executionTime = r.nextInt(5000);
        try {
            Thread.sleep(executionTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

//Pool

package com.java.pool;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class Pool {

    private static final Logger LOGGER = Logger.getLogger(Pool.class.getName());

    int size;
    BlockingQueue<Executor> availableQueue;
    Executor[] executors;

    Object availableLock = new Object();
    static Object executorLock = new Object();
    static BlockingQueue<Job> jobQueue;


    public Pool(int size){
        this.size = size;
        availableQueue = new LinkedBlockingQueue<>();
        jobQueue = new LinkedBlockingQueue<>();

        LOGGER.log(Level.INFO, "All Internal Queue's Initialized.");
        LOGGER.log(Level.INFO, "Pool of size :: " + size + " Created.");

        executors = new Executor[size];

        create_workers(size);

        LOGGER.log(Level.INFO, "Threads Created and Ready for Job.");
    }

    private void create_workers(int size) {
        for (int i = 0; i< size; i++){
            executors[i] = new Executor("Executor :: " + i);
            executors[i].start();
            availableQueue.offer(executors[i]);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static boolean isExecutorsAvailable(){
        if(jobQueue.isEmpty()) {
            try {
                LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
                executorLock.wait();
            }
            catch (IllegalMonitorStateException e) {            }
            catch (InterruptedException e) {}
        }
//        executorLock.notify();
        return true;
    }


    private boolean isTaskAvailable(){
        while(availableQueue.isEmpty()) {
            try {
                availableLock.wait();
            }
            catch (IllegalMonitorStateException e) { }
            catch (InterruptedException e) {  }
        }
//        availableLock.notify();
        return true;
    }

    private Executor getExecutor(){

        Executor curr = null;
        if(isTaskAvailable())
            curr = availableQueue.poll();
        return curr;
    }

    public void execute(Job job){
        LOGGER.log(Level.INFO, "Job added to the queue :: ");
        jobQueue.offer(job);

        while(Pool.isExecutorsAvailable()){
            Executor t = getExecutor();
            if (t.getState().name().equals("NEW")){
                t.start();
            }

            wait_for_completion(t);
            availableQueue.offer(new Executor(t.name));
        }

    }

    private static void wait_for_completion(Executor t) {
        while (t.isAlive()){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class Executor extends Thread{

        String name;


        public Executor(String name){
            this.name = name;

        }
        @Override
        public void run() {
            while(!jobQueue.isEmpty()){
                Job job = jobQueue.poll();
                job.run();
            }
        }
    }

}


Solution

  • IMO, you are trying to invert the problem by having a queue of threads when what should be queued are the tasks.

    I haven't really tried to understand your code, but here is a complete, trivial thread pool implementation to illustrate the core concept. The static main method demonstrates that it works:

    import java.util.concurrent.Executor;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    
    class SimpleThreadPool implements Executor {
        private
        final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        
        public
        SimpleThreadPool(int numWorkerThreads) { 
            for (int i=0 ; i<numWorkerThreads ; i++) {
                Thread t = new Thread(() -> {
                    while (true) {
                         try {
                             Runnable task = queue.take();
                             task.run();
                         }
                         catch (InterruptedException ex) {
                             ex.printStackTrace();
                             break;
                         }
                    }
                });
                t.start();
            }
        }
    
        @Override
        public
        void execute(Runnable task) {
            queue.add(task);
        }
    
        static public
        void main(String[] args) {
            SimpleThreadPool pool = new SimpleThreadPool(3);
            for (int i=0 ; i<10 ; i++) {
                final int seqNum = i;
                pool.execute(() -> System.out.println(seqNum));
            }
        }
    }
    

    There are various things you could add to this to make it more sophisticated;

    • You could provide a shutdown method*
    • You could provide means for a caller to wait for all of the tasks to be completed.
    • You could add graceful handling (or at least, logging) of Errors and RuntimeExceptions that might be thrown by a task.
    • You could provide means to automatically kill off worker threads that have not been used in a while and, to automatically create new worker threads when demand is high.
    • You could provide means to submit a Callable task and return a Future to the caller.
    • Other stuff (See ExecutorService or ThreadPoolExecutor for more ideas.)

    * Note: The demo program won't terminate on its own. You'll have to forcibly kill it because I did not provide any means to shut the pool down.