I am trying to work on some understanding on thread pools and am trying to implement one by myself without using the ExecutorService
. But getting into waiting state for some reason and I am not sure what is causing it. Here is the implementation that I have.
I have a Pool
class which is responsible for creating the worker threads and keeping it ready for jobs that are coming in. I will initialize those in my constructor. Also add those to the queue of available threads.availableQueue
this is a BlockingQueue<Executor>
of Executor
which is an inner class inside the Pool
class
private void create_workers(int size) {
for (int i = 0; i< size; i++){
executors[i] = new Executor("Executor :: " + i);
availableQueue.offer(executors[i]);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
The client code will call get the executor and invoke the job like this
Pool threadPool = new Pool(5);
for (int i=0;i<10;i++){
threadPool.execute(new Job("Job : " + i));
}
Job
class is a simple Runnable class to mimic a job
execute
method will add the job to its jobQueue
and wait if any executors are available or not, if its available then get the executor and invoke the job to execute it. After completion it will put a new Executor in the available queue. Reason for this is after the completion the thread is going into TERMINATED
state and could not get access to the same thread. Not sure if there is a way to repurpose the same thread.. Help needed here if it's possible.
public void execute(Job job){
LOGGER.log(Level.INFO, "Job added to the queue :: ");
jobQueue.offer(job);
while(Pool.isExecutorsAvailable()){
Executor t = getExecutor();
if (t.getState().name().equals("NEW")){
t.start();
}
wait_for_completion(t);
availableQueue.offer(new Executor(t.name));
}
}
The actual problem is that after the first job the code is going on infinite waiting state at while(Pool.isExecutorsAvailable())
I could not identify the problem or not sure whats causing the issue.
The code for isExecutorsAvailable
is here
public static boolean isExecutorsAvailable(){
if(jobQueue.isEmpty()) {
try {
LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
executorLock.wait();
}
catch (IllegalMonitorStateException e) { }
catch (InterruptedException e) {}
}
// executorLock.notify();
return true;
}
Any help on this is much appreciated
Edit:
Added all the code snippets
//Client
package com.java.pool;
public class Client {
public static void main(String[] args) {
Client m = new Client();
Pool threadPool = new Pool(5);
for (int i=0;i<10;i++){
threadPool.execute(new Job("Job : " + i));
}
}
}
//Job
package com.java.pool;
import java.text.SimpleDateFormat;
import java.util.*;
public class Job implements Runnable{
String name;
public Job(String name){
this.name = name;
}
public void print(String message){
System.out.println(message);
}
public void run(){
String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss").format(new java.util.Date());
print(Thread.currentThread().getName() + " Executor picked the job at : " + timeStamp);
work_with_time();
print("Job " + this.name + " Completed at : " + timeStamp);
}
private void work_with_time() {
Random r = new Random();
int executionTime = r.nextInt(5000);
try {
Thread.sleep(executionTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
//Pool
package com.java.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public final class Pool {
private static final Logger LOGGER = Logger.getLogger(Pool.class.getName());
int size;
BlockingQueue<Executor> availableQueue;
Executor[] executors;
Object availableLock = new Object();
static Object executorLock = new Object();
static BlockingQueue<Job> jobQueue;
public Pool(int size){
this.size = size;
availableQueue = new LinkedBlockingQueue<>();
jobQueue = new LinkedBlockingQueue<>();
LOGGER.log(Level.INFO, "All Internal Queue's Initialized.");
LOGGER.log(Level.INFO, "Pool of size :: " + size + " Created.");
executors = new Executor[size];
create_workers(size);
LOGGER.log(Level.INFO, "Threads Created and Ready for Job.");
}
private void create_workers(int size) {
for (int i = 0; i< size; i++){
executors[i] = new Executor("Executor :: " + i);
executors[i].start();
availableQueue.offer(executors[i]);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static boolean isExecutorsAvailable(){
if(jobQueue.isEmpty()) {
try {
LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
executorLock.wait();
}
catch (IllegalMonitorStateException e) { }
catch (InterruptedException e) {}
}
// executorLock.notify();
return true;
}
private boolean isTaskAvailable(){
while(availableQueue.isEmpty()) {
try {
availableLock.wait();
}
catch (IllegalMonitorStateException e) { }
catch (InterruptedException e) { }
}
// availableLock.notify();
return true;
}
private Executor getExecutor(){
Executor curr = null;
if(isTaskAvailable())
curr = availableQueue.poll();
return curr;
}
public void execute(Job job){
LOGGER.log(Level.INFO, "Job added to the queue :: ");
jobQueue.offer(job);
while(Pool.isExecutorsAvailable()){
Executor t = getExecutor();
if (t.getState().name().equals("NEW")){
t.start();
}
wait_for_completion(t);
availableQueue.offer(new Executor(t.name));
}
}
private static void wait_for_completion(Executor t) {
while (t.isAlive()){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class Executor extends Thread{
String name;
public Executor(String name){
this.name = name;
}
@Override
public void run() {
while(!jobQueue.isEmpty()){
Job job = jobQueue.poll();
job.run();
}
}
}
}
IMO, you are trying to invert the problem by having a queue of threads when what should be queued are the tasks.
I haven't really tried to understand your code, but here is a complete, trivial thread pool implementation to illustrate the core concept. The static main
method demonstrates that it works:
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
class SimpleThreadPool implements Executor {
private
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
public
SimpleThreadPool(int numWorkerThreads) {
for (int i=0 ; i<numWorkerThreads ; i++) {
Thread t = new Thread(() -> {
while (true) {
try {
Runnable task = queue.take();
task.run();
}
catch (InterruptedException ex) {
ex.printStackTrace();
break;
}
}
});
t.start();
}
}
@Override
public
void execute(Runnable task) {
queue.add(task);
}
static public
void main(String[] args) {
SimpleThreadPool pool = new SimpleThreadPool(3);
for (int i=0 ; i<10 ; i++) {
final int seqNum = i;
pool.execute(() -> System.out.println(seqNum));
}
}
}
There are various things you could add to this to make it more sophisticated;
shutdown
method*Error
s and RuntimeException
s that might be thrown by a task.submit
a Callable
task and return a Future
to the caller.ExecutorService
or ThreadPoolExecutor
for more ideas.)* Note: The demo program won't terminate on its own. You'll have to forcibly kill it because I did not provide any means to shut the pool down.