Search code examples
javamultithreadingconcurrencyproducer-consumerjava.util.concurrent

Thread pool where workers are both producers and consumers


I have an unbounded queue of jobs which can be processed asynchronously. The processing of each job may or may not trigger the creation of new jobs for this queue.

I would like a pool of several worker threads to take items from this queue and process them in parallel, until both the queue is empty and all worker threads are idle waiting for new jobs on the queue (as a busy worker could end up adding new jobs to the queue).

Is there a recipe for using the java.util.concurrent implementations which I can use to solve this particular problem, where workers are also producers? It is not clear that such a scenario is supported in a straightforward manner from the APIs.

In particular, I want to be able to detect the termination condition, namely, when no more jobs are available (empty job queue) and there will be no more jobs produced (all idle worker threads).

EDIT

Nam San's answer below appears to be the most elegant approach, which basically boiled down to tracking the number of submitted jobs vs. the number of completed jobs, and using the case where these numbers were equal as the termination condition.

I've implemented a full example using java.util.concurrent implementations which extends ThreadPoolExecutor to achieve this, plus specialises the job queue to accept Comparable instances which are sorted in a particular way.

  • TestExecutor.java: A custom executor which extends ThreadPoolExecutor but has additional methods to execute jobs which may create new jobs, and a new await method which waits until all submitted jobs are complete.
  • WorkUnit.java: An example of a comparable, runnable job which may create new jobs to submit to TestExecutor.
  • Test.java: Contains a main method to run an example using WorkUnit instances with a TestExecutor.

Solution

  • The code below demonstrates how you could use a wrapper class around an Executor to count the number of submitted jobs and compare it to the number of completed jobs to achieve what you want. Note that your tasks must call the execute method of the wrapper class and never call the underlying Executor directly. It should be trivial to extend the wrapper below to wrap the 'submit' methods of an ExecutorService if needed.

    public class ExampleExecutor {
    
        private final Executor executor;
        private long submitCount = 0;
        private long doneCount = 0;
    
        public ExampleExecutor(Executor executor) {
            this.executor = executor;
        }
    
        public synchronized void execute(Collection<Runnable> commands) {
            for (Runnable command : commands) {
                execute(command);
            }
        }
    
        public synchronized void execute(final Runnable command) {
            submitCount ++;
    
            executor.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        synchronized (ExampleExecutor.this) {
                            doneCount++;
                            if (doneCount == submitCount) {
                                ExampleExecutor.this.notifyAll();
                            }
                        }
                    }
                }
            });
        }
    
        public synchronized void awaitCompletion() throws InterruptedException {
            while (doneCount != submitCount) {
                this.wait();
            }
        }
    }
    

    EDIT: Added test case below to demonstrate how the above code can be used

    public class Test {
    
        static class Task implements Runnable {
            private final String id;
            private final long repetitions;
            private final long respawnSize;
            private final ExampleExecutor executor;
    
            public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
                this.id = id;
                this.repetitions = repetitions;
                this.respawnSize = respawnSize;
                this.executor = executor;
            }
    
            public void run() {
                for (int i = 0; i < respawnSize; i ++) {
                    // Spawning new sub tasks
                    executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
                }
    
                double sum = 0;
                for (int i = 0; i < repetitions; i++) {
                    sum += Math.sin(i);
                }
    
                System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
            }
        }
    
        public static void main(String argv[]) throws InterruptedException {
            ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
            executor.execute(new Task("0", 2000000, 100, executor));
    
            System.err.println("main thread awaits completion");
            executor.awaitCompletion();
            System.err.println("main thread recieved completion event");
        }
    }