Search code examples
javamultithreadingconcurrencysynchronizationblocking

Synchronisation object to ensure all tasks are completed


Which Java synchronisation object should I use to ensure an arbitrarily large number of tasks are completed? The constraints are that:

  1. Each task takes a non-trivial amount of time to complete and it is appropriate to perform tasks in parallel.
  2. There are too many tasks to fit into memory (i.e. I cannot put a Future for every task into a Collection and then call get on all the futures).
  3. I do not know how many tasks there will be (i.e. I cannot use a CountDownLatch).
  4. The ExecutorService may be shared so I cannot use awaitTermination( long, TimeUnit )

For example, with Grand Central Dispatch, I might do something like this:

let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
    dispatch_group_enter( latch )
    dispatch_async( workQueue )
    {
        self.processItem( item ) // method takes a non-trivial amount of time to run
        dispatch_async( countUpdateQueue )
        {
            itemsProcessed++
        }
        dispatch_group_leave( latch )
    }
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )

It produces output that looks like this (for 128 items): Processed 128 items in 1.846794962883 seconds.

I tried something similar with a Phaser:

final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
    latch.register();
    final Runnable task = new Runnable() {
        public void run() {
            processItem( item ); // method takes a non-trivial amount of time to run
            itemsProcessed.incrementAndGet();
            latch.arrive();
        }
    };
    executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );

The tasks do not always complete before the last print statement and I might get output that looks like this (for 128 items): Processed 121 items in 5.296 seconds. Is the Phaser even the right object to use? The documentation indicates it only supports 65,535 parties so I would need to either batch the items to be processed or introduce some sort of Phaser tiering.


Solution

  • "to ensure an arbitrarily large number of tasks are completed" - the simplest way is to maintain a counter of completed tasks, with blocking operation to wait that given number of task is reached. There is no such ready class, but it is easy to make one:

    class EventCounter {
       long counter=0;
    
       synchronized void up () {
         counter++;
         notifyAll();
       }
       synchronized void ensure (long count) {
         while (counter<count) wait();
       }
     }
    

    "There are too many tasks to fit into memory" - so the process of submitting new tasks must be suspended when the number of running tasks is too high. The simplest way is to consider the number of running tasks as a resource and count it with a semaphore:

    Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
    EventCounter  eventCounter =new EventCounter ();
    
    for( final String item : fetchItems() ) {
        final Runnable task = new Runnable() {
           public void run() {
                processItem( item ); 
                runningTasksSema.release();
                eventCounter.up();
           }
        };
       runningTasksSema.aquire();
       executor.execute(task);
    }
    

    When a thread wants to ensure some given number of tasks are completed, it invokes:

    eventCounter.ensure(givenNumberOfFinishedTasks);
    

    Asynchronous (nonblocking) versions of runningTasksSema.aquire() and eventCounter.ensure() operations can be designed, but they would be more complex.