Which Java synchronisation object should I use to ensure an arbitrarily large number of tasks are completed? The constraints are that:

  1. Each task takes a non-trivial amount of time to complete and it is appropriate to perform tasks in parallel.
  2. There are too many tasks to fit into memory (i.e. I cannot put a Future for every task into a Collection and then call get on all the futures).
  3. I do not know how many tasks there will be (i.e. I cannot use a CountDownLatch).
  4. The ExecutorService may be shared so I cannot use awaitTermination( long, TimeUnit )

For example, with Grand Central Dispatch, I might do something like this:

let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
    dispatch_group_enter( latch )
    dispatch_async( workQueue )
        self.processItem( item ) // method takes a non-trivial amount of time to run
        dispatch_async( countUpdateQueue )
        dispatch_group_leave( latch )
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )

It produces output that looks like this (for 128 items): Processed 128 items in 1.846794962883 seconds.

I tried something similar with a Phaser:

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
    executor.execute( task );
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

The tasks do not always complete before the last print statement and I might get output that looks like this (for 128 items): Processed 121 items in 5.296 seconds. Is the Phaser even the right object to use? The documentation indicates it only supports 65,535 parties so I would need to either batch the items to be processed or introduce some sort of Phaser tiering.


  • "to ensure an arbitrarily large number of tasks are completed" - the simplest way is to maintain a counter of completed tasks, with blocking operation to wait that given number of task is reached. There is no such ready class, but it is easy to make one:

    class EventCounter {
       long counter=0;
       synchronized void up () {
       synchronized void ensure (long count) {
         while (counter<count) wait();

    "There are too many tasks to fit into memory" - so the process of submitting new tasks must be suspended when the number of running tasks is too high. The simplest way is to consider the number of running tasks as a resource and count it with a semaphore:

    Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
    EventCounter  eventCounter =new EventCounter ();
    for( final String item : fetchItems() ) {
        final Runnable task = new Runnable() {
           public void run() {
                processItem( item ); 

    When a thread wants to ensure some given number of tasks are completed, it invokes:


    Asynchronous (nonblocking) versions of runningTasksSema.aquire() and eventCounter.ensure() operations can be designed, but they would be more complex.