Search code examples
javajava.util.concurrentcompletion-service

Enforce executorService.awaitTermination while using CompletionService


I am trying to submit multiple tasks and obtain the results as and when it is available. However, after the end of the loop, I have to enforce that all the tasks complete within specified amount of time. If not, throw an error. Initially, all I had was executorService's invokeAll, shutdown and awaitTermination calls that were used to ensure that all tasks complete (inspite of errors or not). I migrated the code to use CompletionService to display the results. Where can I enforce the awaitTermination clause in the CompletionService calls?

 CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
            logger.info("Submitting all tasks");
            for (Callable<String> task : tasks)
                completionService.submit(task);
            executor.shutdown();
            logger.info("Tasks submitted. Now checking the status.");
            while (!executor.isTerminated())
            {
                final Future<String> future = completionService.take();
                String itemValue;
                try
                {
                    itemValue = future.get();
                    if (!itemValue.equals("Bulk"))
                        logger.info("Backup completed for " + itemValue);
                }
                catch (InterruptedException | ExecutionException e)
                {
                    String message = e.getCause().getMessage();
                    String objName = "Bulk";
                    if (message.contains("(") && message.contains(")"))
                        objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                    logger.error("Failed retrieving the task status for " + objName, e);
                }
            }
executor.awaitTermination(24, TimeUnit.HOURS);

In other words, how can I utilize timeout for CompletionService?

EDIT:

The initial code I had was displayed below. The problem is that I am iterating through the future list and then printing them as completed. However, my requirement is to display the ones that were completed at a FCFS basis.

List<Future<String>> results = executor.invokeAll(tasks);
        executor.shutdown();
        executor.awaitTermination(24, TimeUnit.HOURS);

        while (results.size() > 0)
        {
            for (Iterator<Future<String>> iterator = results.iterator(); iterator.hasNext();)
            {
                Future<String> item = iterator.next();
                if (item.isDone())
                {
                    String itemValue;
                    try
                    {
                        itemValue = item.get();
                        if (!itemValue.equals("Bulk"))
                            logger.info("Backup completed for " + itemValue);
                    }
                    catch (InterruptedException | ExecutionException e)
                    {
                        String message = e.getCause().getMessage();
                        String objName = "Bulk";
                        if (message.contains("(") && message.contains(")"))
                            objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                        logger.error("Failed retrieving the task status for " + objName, e);
                    }
                    finally
                    {
                        iterator.remove();
                    }
                }
            }
        }

Solution

  • I'd suggest you wait for the executor to terminate on another thread

    That way you can achieve serving results FCFS and also enforce the timeout.

    It can be easily achieved with something that will look like the following

    CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
    
    // place all the work in a function (an Anonymous Runnable in this case)
    // completionService.submit(() ->{work});
    // as soon as the work is submitted it is handled by another Thread
    
    completionService.submit(() ->{
        logger.info("Submitting all tasks");
        for (Callable<String> task : tasks)
        completionService.submit(task);
        logger.info("Tasks submitted. Now checking the status.");
        int counter = tasks.size();
        for(int i = counter; counter >=1; counter--)  // Replaced the while loop
        {
            final Future<String> future = completionService.take();
            String itemValue;
            try
            {
                itemValue = future.get();
                if (!itemValue.equals("Bulk"))
                    logger.info("Backup completed for " + itemValue);
            }
            catch (InterruptedException | ExecutionException e)
            {
                String message = e.getCause().getMessage();
                String objName = "Bulk";
                if (message.contains("(") && message.contains(")"))
                    objName = message.substring(message.indexOf("(") + 1, message.indexOf(")"));
                logger.error("Failed retrieving the task status for " + objName, e);
            }
        }
    });
    
    // After submitting the work to another Thread
    // Wait in your Main Thread, and enforce termination if needed
    shutdownAndAwaitTermination(executor);
    

    You handle the executors termination && waiting using this (taken from ExecutorsService)

     void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
         // Wait a while for existing tasks to terminate
         if (!pool.awaitTermination(24, TimeUnit.HOURS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
               System.err.println("Pool did not terminate");
         }
       } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
       }
     }