Search code examples
javamultithreadingconcurrencyqueue

Replace a BlockingQueue + daemon thread with an Executor


I have a BlockingQueue with tasks, and a single daemon thread executing them:

public class TaskManager {

  private final BlockingQueue<Task> taskQueue;

  public TaskManager() {
    this.taskQueue = new ArrayBlockingQueue<>(5); //Max 5 tasks waiting
    Thread taskRunner = new Thread(this::runTask);
    taskRunner.setDaemon(true); //Infinite loop inside shouldn't prevent JVM shutdown
    taskRunner.start();
  }

  //Called async
  public void enqueueTask(Task task) {
    boolean added = false;
    try {
        added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {/*no-op*/}
    if (!added) {
        publishEvent(new Fail(task)); //Async notify the client their task won't run
    }
  }

  //Also called async
  public void dequeueTask(Task task) {
    this.taskQueue.remove(task);
  }

  void runTask() {
    while (true) {
      try {
        Task task = taskQueue.take(); //Blocks indefinitely
        doStuff(task);
      } catch (InterruptedException e) {
          //Can only be a spurious interruption. Ignore.
      }
    }
  }
}

Doesn't seem too special. But. It feels like I'm reinventing the wheel by managing the queue and the daemon myself. I'd expect this to be easily doable with an Executor(Service) instead. I could have a ThreadPoolExecutor with 1 thread, give it a bounded queue. It exposes a remove method for me to dequeue tasks. But it seems there's no way to timeout if enqueuing takes too long (5s timeout on taskQueue.offer() in my example).

Is what I'm doing already legit and I'm just overthinking? If not, can I replace this with an Executor and how?


Solution

  • It’s important to understand that ThreadPoolExecutor does never wait for the queue to become “not full”. It uses offer which immediately returns false if the capacity is exhausted and if that happens and the configured maximum thread size has been reached, the executor will call the RejectedExecutionHandler.

    So, you can specify a maximum thread count of one and this RejectedExecutionHandler:

    RejectedExecutionHandler tryForFiveSecs = (r, es) -> {
        boolean added = false;
        try {
            BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
            added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {/*no-op*/}
        if(!added) {
            //assuming the Runnable and "Task" are interchangeable
            publishEvent(new Fail(r));
        }
    };
    

    Using the constructor ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler).

    Note that this covers uses of execute which is sufficient for your existing workflows. If you start using submit and the returned futures, you have to consider that the futures returned for failed jobs will never finish. Unless, you extend the handler, e.g.:

    RejectedExecutionHandler tryForFiveSecs = (r, es) -> {
        boolean added = false;
        try {
            BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
            added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {/*no-op*/}
        if(!added) {
            //assuming the Runnable and "Task" are interchangeable
            publishEvent(new Fail(r));
            if(r instanceof Future<?> f) f.cancel(false);
        }
    };