Which Java synchronisation object should I use to ensure an arbitrarily large number of tasks are completed? The constraints are that:
Future
for every task into a Collection
and then call get
on all the futures).CountDownLatch
).ExecutorService
may be shared so I cannot use awaitTermination( long, TimeUnit )
For example, with Grand Central Dispatch, I might do something like this:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
It produces output that looks like this (for 128 items): Processed 128 items in 1.846794962883 seconds.
I tried something similar with a Phaser
:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
The tasks do not always complete before the last print statement and I might get output that looks like this (for 128 items): Processed 121 items in 5.296 seconds.
Is the Phaser
even the right object to use? The documentation indicates it only supports 65,535 parties so I would need to either batch the items to be processed or introduce some sort of Phaser
tiering.
"to ensure an arbitrarily large number of tasks are completed" - the simplest way is to maintain a counter of completed tasks, with blocking operation to wait that given number of task is reached. There is no such ready class, but it is easy to make one:
class EventCounter {
long counter=0;
synchronized void up () {
counter++;
notifyAll();
}
synchronized void ensure (long count) {
while (counter<count) wait();
}
}
"There are too many tasks to fit into memory" - so the process of submitting new tasks must be suspended when the number of running tasks is too high. The simplest way is to consider the number of running tasks as a resource and count it with a semaphore:
Semaphore runningTasksSema=new Semaphore(maxNumberOfRunningTasks);
EventCounter eventCounter =new EventCounter ();
for( final String item : fetchItems() ) {
final Runnable task = new Runnable() {
public void run() {
processItem( item );
runningTasksSema.release();
eventCounter.up();
}
};
runningTasksSema.aquire();
executor.execute(task);
}
When a thread wants to ensure some given number of tasks are completed, it invokes:
eventCounter.ensure(givenNumberOfFinishedTasks);
Asynchronous (nonblocking) versions of runningTasksSema.aquire()
and eventCounter.ensure()
operations can be designed, but they would be more complex.