Search code examples
javamultithreadingconcurrencyjava.util.concurrentcompletion-service

Java Clear CompletionService Working Queue


I am writing a program which uses a CompletionService to run threaded analyses on a bunch of different objects, where each "analysis" consists of taking in a string and doing some computation to give either true or false as an answer. My code looks essentially like this:

// tasks come from a different method and contain the strings + some other needed info
List<Future<Pair<Pieces,Boolean>>> futures = new ArrayList<>(tasks.size());
for (Task task : tasks) {
    futures.add(executorCompletionService.submit(task));
}
ArrayList<Pair<Pieces, Boolean>> pairs = new ArrayList<>();

int toComplete = tasks.size();
int received = 0;
int failed = 0;
while (received < toComplete) {
    Future<Pair<Pieces, Boolean>> resFuture = executorCompletionService.take();
    received++;
    Pair<Pieces, Boolean> res = resFuture.get();
    if (!res.getValue()) failed++;
    if (failed > 300) {
        // My problem is here
    }

    pairs.add(res);
}

// return pairs and go on to do something else

In the marked section, my goal is to have it abandon the computation if over 300 strings have failed, such that I can move on to a new analysis, calling this method again with some different data. The problem is that since the same CompletionService is used again, if I do not somehow clear the queue, then the worker queue will keep growing as I keep adding more to it every time I use it (since after 300 failures there are likely still many unprocessed strings left).

I have tried to loop through the futures list and delete all unfinished tasks using something like futures.foreach(future -> future.cancel(true), however when I next call the method I get a java.util.concurrent.CancellationException error when I try to call resFuture.get().

(Edit: It seems that even though I call foreach(future->future.cancel(true)), this does not guarantee that the workerQueue is actually clear afterwards. I do not understand why this is. It almost seems as if it takes a while to clear the queue, and the code does not wait for this to happen before moving to the next analysis, so occasionally get will be called on a future which has been cancelled.)

I have also tried to do

            while (received < toComplete) {
                executorCompletionService.take();
                received++;
            }

To empty the queue, and while this works it is barely faster than just running all of the analyses anyway, and so it does not do very well for the efficiency.

My question is if there is a better way to empty the worker queue such that when I next call this code it is as if the CompletionService is new again.

Edit: Another method I have tried is just setting executorCompletionService = new CompletionService, which is slightly faster than my other solution but is still rather slow and definitely not good practice.

P.S.: Also happy to accept any other ways in which this is possible, I am not attached to using a CompletionService it has just been the easiest thing for what I've done so far


Solution

  • This has since been resolved, but I have seen other similar questions with no good answer so here is my solution:

    Previously, I was using an ExecutorService to create my ExecutorCompletionService(ExecutorService). I switched the ExecutorService to be a ThreadPoolExecutor, and since in the backed the ExecutorService already is a ThreadPoolExecutor all method signatures can be fixed with just a cast. Using the ThreadPoolExecutor gives you much more freedom in the backend, and specifically you can called threadPoolExecutor.getQueue().clear() which clears all tasks awaiting completion. Finally, I needed to make sure to "drain" the remaining working tasks, so my final cancelling code looked like this:

            if (failed > maxFailures) {
               executorService.getQueue().clear();
               while (executorService.getActiveCount() > 0) {
                   executorCompletionService.poll();
               }
    

    At the end of this code block, the executor will be ready to run again.