I read a lot of post about ExecutorService
, but I can't find the way of doing what I need.
I need some concurrent threads. When any of them throw a custom exception all the remaining tasks are canceled.
This is an example of what I did. The task are working concurrent, but aren't interrupted on exception.
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Future> futures = new ArrayList<Future>();
futures.add(executorService.submit(new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(5000);
System.out.println("Task 1 done");
return null;
}
}));
futures.add(executorService.submit(new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(2000);
System.out.println("Task 2 done");
if (true) {
throw new CustomException("Error on task 2");
}
return null;
}
}));
executorService.shutdown();
try {
executeFutures(futures);
} catch (CustomException ex) {
System.out.println("Received:" + ex.getMessage());
executorService.shutdownNow();
}
}
private static void executeFutures(List<Future> futures) throws CustomException {
try {
for (Future f : futures) {
f.get();
}
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof CustomException) {
throw (CustomException) e.getCause();
}
}
}
}
This is the output:
Task 2 done //exception is thrown here but task1 continue.
Task 1 done
Received:Error on task 2
Any help will be appreciated.
Your problem is due to the fact that the method executeFutures
make the main thread call f.get()
on the first Future
instance corresponding to the long task, which makes it wait the duration of the task so at least 5 seconds whatever happens. Once done it will then call f.get()
on the second Future
which is already over so it gets immediately the CustomException
from the ExecutionException
and calls executorService.shutdownNow()
but it is already too late as there is no more tasks left to interrupt.
What you could do, is to use a decorator of type Callable
that will automatically shutdown the thread pool when a CustomException
is thrown, this way the thread pool will be shutdown directly by the thread that has executed the task that throws the exception instead of using the main thread.
Something like this:
public class AutoShutdown<V> implements Callable<V> {
private final ExecutorService executorService;
private final Callable<V> task;
public AutoShutdown(final ExecutorService executorService, final Callable<V> task) {
this.executorService = executorService;
this.task = task;
}
@Override
public V call() throws Exception {
try {
return task.call();
} catch (CustomException e) {
executorService.shutdownNow();
throw e;
}
}
}
Then you will need to submit your tasks through the decorator as next:
futures.add(
executorService.submit(
new AutoShutdown<>(
executorService,
new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(5000);
System.out.println("Task 1 done");
return null;
}
}
)
)
);
futures.add(
executorService.submit(
new AutoShutdown<>(
executorService,
new Callable<Void>() {
public Void call() throws Exception {
Thread.sleep(2000);
System.out.println("Task 2 done");
if (true) {
throw new CustomException("Error on task 2");
}
return null;
}
}
)
)
);
Output:
Task 2 done
As you can see in the output, the task one has been interrupted soon enough.
The message "
Received:Error on task 2
" was not thrown, so it looks like a successful execution, and is not the case
No it is only because the first call to f.get()
throws an InterruptedException
as expected which makes it exit from executeFutures
because the catch is performed outside the loop, move it inside the loop as next:
private static void executeFutures(List<Future> futures) throws CustomException {
for (Future f : futures) {
try {
f.get();
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof CustomException) {
throw (CustomException) e.getCause();
}
}
}
}
Output:
Task 2 done
Received:Error on task 2