Search code examples
javamultithreadingblockingqueuescheduledexecutorservice

Use PriorityBlockingQueue with Comparator in ScheduledThreadPoolExecutor


First of all: I already read the following two questions and their possible solutions:

The dilemma I'm having is that I want to use a custom BlockingQueue or rather a different but specific queue, namely PriorityBlockingQueue with a custom Comparator which sorts the queue by priority.

The ThreadPoolExecutor does support custom queues in its constructors, but it does not implement the methods from the ScheduledExecutorService interface. So I went and found the subclass ScheduledThreadPoolExecutor, but it does not support custom queues and uses a DelayedWorkQueue instead.

Problems:

  • I cannot extend from ScheduledThreadPoolExecutor because creating constructors for my own class won't do anything since the constructors of the ScheduledThreadPoolExecutor don't accept custom queues as a parameter.
  • I cannot copy the contents of the ThreadPoolExecutor class and the implementations of the ScheduledThreadPoolExecutor because it uses a lot of methods which are declared with no modifiers (e. g. canRunInCurrentState(boolean periodic) and all method being invoked by this call) which does not allow me to access the method since even though its a subclass of ThreadPoolExecutor, it is not in the same package.

My current implementation looks like this:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.croemheld.tasks.PriorityTaskComparator;

public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {

    private static final int INITIAL_QUEUE_SIZE = 10;

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()));
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), handler);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory, handler);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

}

As you can see, the constructors problem is solved, but it still leaves the implementations of the scheduling methods from ScheduledExecutorService.

So I'm asking you, is there any way to maybe pass a Comparator to the queue or a simple and not too exhaustive way to create an own executor class which implements the methods from ScheduledExecutorService and offers the methods from the ThreadPoolExecutor class as well as uses a PriorityBlockingQueue?


Solution

  • I looked for other possible solutions and I came to the following result:

    Since a ThreadPoolExecutor manages a pool of multiple Threads (i. e., if you set two or more threads in the Executors.newFixedThreadPool(int nThreads) method), and if you really want to mix a priority based BlockingQueue with it, then I would suggest do the following:

    • Create an own ThreadPoolExecutor class similar to the one above, using a PriorityBlockingQueue with a custom comparator.
    • Create your own Task class (or FutureTask extension, whatever you think is the best for you)
    • These task classes are for one-shot tasks, meaning they just run once.

    For loop-tasks which should run in the background periodically, I came up with a simple class for this purpose:

    public abstract class AbstractThread extends Thread {
    
        protected Runnable runnable;
    
        protected AbstractThread(String name, Runnable runnable) {
            super(runnable, name);
    
            this.runnable = runnable;
        }
    
        /**
         * This method provides a way to perform some action before the thread is actually starting.
         */
        protected abstract void beforeExecution();
    
        /**
         * This method provides a way to perform some action after the thread finished.
         */
        protected abstract void afterExecution();
    
        @Override
        public void run() {
            try {
                doRun();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * Run the given runnable here.
         * 
         * @throws InterruptedException 
         */
        protected abstract void doRun() throws InterruptedException;
    
    }
    

    While simple one-shot threads just run the runnable once:

    @Override
    protected void doRun() {
        beforeExecution();
    
        runnable.run();
    
        afterExecution();
    }
    

    Periodic tasks in a thread would just do something like:

    @Override
    protected void doRun() throws InterruptedException {
        beforeExecution();
    
        while(!isInterrupted()) {
            runnable.run();
            Thread.sleep(millis);
        }
    
        afterExecution();
    }
    

    If you want to support periodic tasks which run once in a while, you could either pass a delay parameter to the Thread instance or you just write something like Thread.sleep(delay) in your runnable.

    This is no actual code, just a suggestion as I'm trying to work with this now.