I have a BlockingQueue
with tasks, and a single daemon thread executing them:
public class TaskManager {
private final BlockingQueue<Task> taskQueue;
public TaskManager() {
this.taskQueue = new ArrayBlockingQueue<>(5); //Max 5 tasks waiting
Thread taskRunner = new Thread(this::runTask);
taskRunner.setDaemon(true); //Infinite loop inside shouldn't prevent JVM shutdown
taskRunner.start();
}
//Called async
public void enqueueTask(Task task) {
boolean added = false;
try {
added = taskQueue.offer(task, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {/*no-op*/}
if (!added) {
publishEvent(new Fail(task)); //Async notify the client their task won't run
}
}
//Also called async
public void dequeueTask(Task task) {
this.taskQueue.remove(task);
}
void runTask() {
while (true) {
try {
Task task = taskQueue.take(); //Blocks indefinitely
doStuff(task);
} catch (InterruptedException e) {
//Can only be a spurious interruption. Ignore.
}
}
}
}
Doesn't seem too special. But. It feels like I'm reinventing the wheel by managing the queue and the daemon myself. I'd expect this to be easily doable with an Executor(Service)
instead. I could have a ThreadPoolExecutor
with 1 thread, give it a bounded queue. It exposes a remove
method for me to dequeue tasks. But it seems there's no way to timeout if enqueuing takes too long (5s timeout on taskQueue.offer()
in my example).
Is what I'm doing already legit and I'm just overthinking? If not, can I replace this with an Executor
and how?
It’s important to understand that ThreadPoolExecutor
does never wait for the queue to become “not full”. It uses offer
which immediately returns false
if the capacity is exhausted and if that happens and the configured maximum thread size has been reached, the executor will call the RejectedExecutionHandler
.
So, you can specify a maximum thread count of one and this RejectedExecutionHandler
:
RejectedExecutionHandler tryForFiveSecs = (r, es) -> {
boolean added = false;
try {
BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {/*no-op*/}
if(!added) {
//assuming the Runnable and "Task" are interchangeable
publishEvent(new Fail(r));
}
};
Using the constructor ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
.
Note that this covers uses of execute
which is sufficient for your existing workflows. If you start using submit
and the returned futures, you have to consider that the futures returned for failed jobs will never finish. Unless, you extend the handler, e.g.:
RejectedExecutionHandler tryForFiveSecs = (r, es) -> {
boolean added = false;
try {
BlockingQueue<Runnable> taskQueue = ((ThreadPoolExecutor)es).getQueue();
added = taskQueue.offer(r, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {/*no-op*/}
if(!added) {
//assuming the Runnable and "Task" are interchangeable
publishEvent(new Fail(r));
if(r instanceof Future<?> f) f.cancel(false);
}
};