Search code examples
javajava.util.concurrentthreadpoolexecutor

Does threads of ThreadPoolExecutor not runs concurrently using with PriorityBlockingQueue


I am using java ThreadPoolExecutor to run concurrent thread execution. I used ArrayBlockingQueue to keep threads in queue. But now requirement has changed and I need to add thread run time(no size limit) and it should be prioritized. So i decided to use PriorityBlockingQueue instead of ArrayBlockingQueue with some comparison Logic. After using PriorityBlockingQueue, threads are running sequentially one after one not concurrently. Only one thread run at a time, rather than whatever the active thread count will be. Please let me know if anybody have any suggestions to resolve this issue and achieve my requirement(thread should be added in pool at run time and it execution should be based on priority).

My demo code:

//RejectedExecutionHandler implementation
    RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
    //Get the ThreadFactory implementation to use
    BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>(50, ThreadComparator.getComparator());
    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(1, activeThread, 10, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
    //start the monitoring thread
    MyMonitorThread monitor = new MyMonitorThread(executorPool, 20, "Demo");
    Thread monitorThread = new Thread(monitor);
    monitorThread.start();

    for (int i = 0; i < totalThead; i++) {
        int prio = i % 3 == 0 ? 3 : 5;
        executorPool.execute(new MyThread("Thread-" + i, prio));        
    }

    // Inserting more threads in between concurrent execution.
    try {
        Thread.sleep(40000);
        for (int j = 101; j < 110; j++) {
            executorPool.execute(new MyThread("Thread-" + j, 2));
        }
    } catch (InterruptedException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }


    while(executorPool.getActiveCount() != 0) {
        try {
            Thread.sleep(10000); 
        } catch (InterruptedException e) {
            System.out.println("Error while thread sleeping: " + e);
        }
    }
    //shut down the pool
    executorPool.shutdown();
    //shut down the monitor thread
    try {
        Thread.sleep(5000); 
    } catch (InterruptedException e) {
        System.out.println("Error while thread sleeping: " + e);
    }
    monitor.shutdown();

 public abstract class ThreadComparator implements Comparator<Runnable>{

public static Comparator<Runnable> getComparator() {
    return new Comparator<Runnable>() {
        @Override
        public int compare(Runnable t1, Runnable t2) {
            CompareToBuilder compare = new CompareToBuilder();
            MyThread mt1 = (MyThread) t1;
            MyThread mt2 = (MyThread) t2;
            compare.append(mt1.getPriority(), mt2.getPriority());
            return compare.toComparison();
        }
    };
}

}


Solution

  • This is the expected behaviour of ThreadPoolExecutor with an unbounded work queue.

    To cite the ThreadPoolExecutor JavaDoc:

    Core and maximum pool sizes
    A ThreadPoolExecutor will automatically adjust the pool size [..]. When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. [...]

    Since you define corePoolSize as 1 and a PriorityBlockingQueue is essentially an unbounded queue (that can never become full), you will never have more than one thread.

    The fix is to adjust the corePoolSize to the required number of threads.