Search code examples
javaspringconcurrencyexecutorservicejava.util.concurrent

How to stop Callable tasks in Executor service if exception occur


I'm trying to implement a sample application to test Callable and ExecutorService interfaces.

In my app I have:

@Bean("fixedThreadPool")
public ExecutorService fixedThreadPool() {
    return Executors.newFixedThreadPool(5);
}

Then:

public void removeUserIds(Set<String> userIds) {
 UriComponentsBuilder builder = UriComponentsBuilder.fromUriString("http://localhost:8080/remove");
    final List<Callable<String>> callables = new ArrayList<>();  
    userIds.forEach(userId -> {
        final Callable<String> task = () -> callServiceToRemove(builder,userId); //Call to remote service using RestTemplate
        callables.add(task);
    });

    try {
        final List<Future<String>> futureList =
            executor.invokeAll(callables);

        futureList.forEach(future -> {
            try {
                log.info(future.get());
            } catch (final Exception e) {
                log.error("Error : "+ e.getMessage());
            } 
        });
    } catch (final Exception e) {
        log.error("Error Error Error : "+ e.getMessage());
    } finally {
        executor.shutdown();
    }
}

When I am calling removeUserIds() method with 100 userIds, It is working fine in happy flow, but if service is unavailable or down, error is getting printed 100th time. I am not able to stop/terminate thread if service is unavailable or down, so further call will not happen to service. Could any one help here to fix this issue, how I can stop thread execution if service is down, or suggest feasible solution here?


Solution

  • This is a more a design problem than a coding issue. There may be several approaches. You may take this one for example:

    Use a global flag

    Look for a global boolean flag, say Globals.serviceUnavailable, before actually firing the remote service call. This global flag can be set by the first service(s) that encounter the remote error. Here are the changes to the code.

    final Callable<String> task = () -> {
       try{    
           if( !Globals.serviceUnavailable ) callServiceToRemove(builder,userId);
       }
       catch( ServiceUnavailableException e ){ //Or whatever your exception is
           Globals.serviceUnavailable = true; //Notify the remaining tasks that the service is unavailable.
       }
    
    }
    

    Of course, you will have to see if updating the value of Globals.serviceUnavailable has to be synchronized. (It may not be necessary if you are OK for a partial success of removal of the batch of user Ids.)

    Also, this will work only if your thread pool is much smaller than the number of submitted tasks, which I see is the case here.