Search code examples
javaexecutorservicejava.util.concurrentexecutor

How do I know when ExecutorService has finished if items on the ES can resubmit to the ES


My Java application works on music files within folders, it is designed to process multiple folders in parallel and independently. To do this each folder is processed by an ExecutorService that has a maximum pool size that matches no of CPUs of the computer.

For example, if we have 8-CPU computer then eight folders can (in theory) be processed concurrently, if we have a 16-CPU computer then 16 folders can be processed concurrently. If we only have 1 CPU then we set pool-size to 3, to allow the CPU to continue doing something if one folder blocked on I/O.

However, we don't actually have just one ExecutorService we have more than one because each folder can go through a number of stages.

Process1 (uses ExecutorService1) → Process2 (ExecutorService2) → Process3 (ExecutorService3)

Process 1,2,3 etc all implements Callable and all have their own associated ExecutorService. There is a FileLoader process that we kick off and this loads folders and then create a Process1 callable for each folder and submits to Process1 executor, for each Process1 callable it will do its work and then submit to a different callable, this maybe Process2, Process3 ecetera but we never go backwards, e.g Process3 will never submit to Process1. We actually have 12 processes, but any particular folder is unlikeley to go through all 12 processes

But I realized that this is flawed because in the case of a 16-CPU computer each ES can have pool-size of 16, so we actually have 48 threads running and this will just lead too much contention.

So what I was going to do was have all processes (Process1, Process2…) use the same ExecutorService, that way we only ever worker threads matching CPUs.

However, in my current situation, we have a SongLoader process that has just one task submitted (loading of all folders) and we then call shutdown(), this won't complete until everything has been submitted to Process0, then shutdown() on Process0 won't succeed until everything sent to Process1 and so on.

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

However, if everything was on same ES and Process1 was submitting to Process2 this would no longer work because at the time shutdown() was called not all folders that Process1 would have submitted to Process2 so it would be shut down prematurely.

So how do I detect when all work has been completed using a single ExecutorService when tasks on that ES can submit to other tasks on the same ES?

Or is there a better approach?

Note, you might just think why doesnt he just merge the logic of Process1,2 & 3 into a single Process. The difficulty is that although I initially I groups songs by folder, sometimes the songs gets split into smaller groups and they get allocated to seperate processes doiwn the line and not neessarily the same process, there are actually 12 processes in total.

Attempt based on Sholms idea

Main Thread

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

With Process code submitting new tasks using this function from MainAnalyserService

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

It looked like it was working but it failed with

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

and I now releize I cannot hyave one thread calling future.get() (which waits until done), whilst at the same time other threads are adding to the list.


Solution

  • I agree with Shloim that you don't need multiple ExecutorService instances here -- just one (sized to the number of CPUs you have available) is sufficient and actually optimal. Actually, I think you might not need ExecutorService; a simple Executor can do the job if you use an external mechanism of signaling completeness.

    I would start by building a class to represent the entirety of a larger work item. If you need to consume the results from each child work item, you could use a queue, but if you just want to know if there is work left to do, you only need a counter.

    For example, you could do something like this:

    public class FolderWork implements Runnable {
        private final Executor executor;
        private final File folder;
    
        private int pendingItems;  // guarded by monitor lock on this instance
    
        public FolderWork(Executor executor, File folder) {
            this.executor = executor;
            this.folder = folder;
        }
    
        @Override
        public void run() {
            for (File file : folder.listFiles()) {
                enqueueMoreWork(file);
            }
        }
    
        public synchronized void enqueueMoreWork(File file) {
            pendingItems++;
            executor.execute(new FileWork(file, this));
        }
    
        public synchronized void markWorkItemCompleted() {
            pendingItems--;
            notifyAll();
        }
    
        public synchronized boolean hasPendingWork() {
            return pendingItems > 0;
        }
    
        public synchronized void awaitCompletion() {
           while (pendingItems > 0) {
               wait();
           }
        }
    }
    
    public class FileWork implements Runnable {
        private final File file;
        private final FolderWork parent;
    
        public FileWork(File file, FolderWork parent) {
            this.file = file;
            this.parent = parent;
        }
    
        @Override
        public void run() {
            try {
               // do some work with the file
    
               if (/* found more work to do */) {
                   parent.enqueueMoreWork(...);
               }
            } finally {
                parent.markWorkItemCompleted();
            }
        }
    }
    

    If you're worried about synchronization overhead for the pendingItems counter, you can use an AtomicInteger for it instead. Then you need a separate mechanism for notifying a waiting thread that we are done; for example, you can use a CountDownLatch. Here's an example implementation:

    public class FolderWork implements Runnable {
        private final Executor executor;
        private final File folder;
    
        private final AtomicInteger pendingItems = new AtomicInteger(0);
        private final CountDownLatch latch = new CountDownLatch(1);
    
        public FolderWork(Executor executor, File folder) {
            this.executor = executor;
            this.folder = folder;
        }
    
        @Override
        public void run() {
            for (File file : folder.listFiles()) {
                enqueueMoreWork(file);
            }
        }
    
        public void enqueueMoreWork(File file) {
            if (latch.getCount() == 0) {
                throw new IllegalStateException(
                    "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
            }
            pendingItems.incrementAndGet();
            executor.execute(new FileWork(file, this));
        }
    
        public void markWorkItemCompleted() {
            int remainingItems = pendingItems.decrementAndGet();
            if (remainingItems == 0) {
                latch.countDown();
            }
        }
    
        public boolean hasPendingWork() {
            return pendingItems.get() > 0;
        }
    
        public void awaitCompletion() {
           latch.await();
        }
    }
    

    You would call this like so:

    Executor executor = Executors.newCachedThreadPool(...);
    FolderWork topLevel = new FolderWork(executor, new File(...));
    executor.execute(topLevel);
    topLevel.awaitCompletion();
    

    This example only shows one level of child work items, but you can use any number of child work items as long as they all use the same pendingItems counter to keep track of how much work is left to do.