I am writing a program which uses a CompletionService
to run threaded analyses on a bunch of different objects, where each "analysis" consists of taking in a string and doing some computation to give either true
or false
as an answer. My code looks essentially like this:
// tasks come from a different method and contain the strings + some other needed info
List<Future<Pair<Pieces,Boolean>>> futures = new ArrayList<>(tasks.size());
for (Task task : tasks) {
futures.add(executorCompletionService.submit(task));
}
ArrayList<Pair<Pieces, Boolean>> pairs = new ArrayList<>();
int toComplete = tasks.size();
int received = 0;
int failed = 0;
while (received < toComplete) {
Future<Pair<Pieces, Boolean>> resFuture = executorCompletionService.take();
received++;
Pair<Pieces, Boolean> res = resFuture.get();
if (!res.getValue()) failed++;
if (failed > 300) {
// My problem is here
}
pairs.add(res);
}
// return pairs and go on to do something else
In the marked section, my goal is to have it abandon the computation if over 300 strings have failed, such that I can move on to a new analysis, calling this method again with some different data. The problem is that since the same CompletionService
is used again, if I do not somehow clear the queue, then the worker queue will keep growing as I keep adding more to it every time I use it (since after 300 failures there are likely still many unprocessed strings left).
I have tried to loop through the futures
list and delete all unfinished tasks using something like futures.foreach(future -> future.cancel(true)
, however when I next call the method I get a java.util.concurrent.CancellationException
error when I try to call resFuture.get()
.
(Edit: It seems that even though I call foreach(future->future.cancel(true))
, this does not guarantee that the workerQueue
is actually clear afterwards. I do not understand why this is. It almost seems as if it takes a while to clear the queue, and the code does not wait for this to happen before moving to the next analysis, so occasionally get
will be called on a future which has been cancelled.)
I have also tried to do
while (received < toComplete) {
executorCompletionService.take();
received++;
}
To empty the queue, and while this works it is barely faster than just running all of the analyses anyway, and so it does not do very well for the efficiency.
My question is if there is a better way to empty the worker queue such that when I next call this code it is as if the CompletionService
is new again.
Edit: Another method I have tried is just setting executorCompletionService = new CompletionService
, which is slightly faster than my other solution but is still rather slow and definitely not good practice.
P.S.: Also happy to accept any other ways in which this is possible, I am not attached to using a CompletionService
it has just been the easiest thing for what I've done so far
This has since been resolved, but I have seen other similar questions with no good answer so here is my solution:
Previously, I was using an ExecutorService
to create my ExecutorCompletionService(ExecutorService)
. I switched the ExecutorService
to be a ThreadPoolExecutor
, and since in the backed the ExecutorService
already is a ThreadPoolExecutor
all method signatures can be fixed with just a cast. Using the ThreadPoolExecutor
gives you much more freedom in the backend, and specifically you can called threadPoolExecutor.getQueue().clear()
which clears all tasks awaiting completion. Finally, I needed to make sure to "drain" the remaining working tasks, so my final cancelling code looked like this:
if (failed > maxFailures) {
executorService.getQueue().clear();
while (executorService.getActiveCount() > 0) {
executorCompletionService.poll();
}
At the end of this code block, the executor will be ready to run again.