I have multiple consumer threads waiting on a CountDownLatch
of size 1 using await()
. I have a single producer thread that calls countDown()
when it successfully finishes.
This works great when there are no errors.
However, if the producer detects an error, I would like for it to be able to signal the error to the consumer threads. Ideally I could have the producer call something like abortCountDown()
and have all of the consumers receive an InterruptedException or some other exception. I don't want to call countDown()
, because this requires all of my consumer threads to then do an additional manual check for success after their call to await()
. I'd rather they just receive an exception, which they already know how to handle.
I know that an abort facility is not available in CountDownLatch
. Is there another synchronization primitive that I can easily adapt to effectively create a CountDownLatch
that supports aborting the countdown?
JB Nizet had a great answer. I took his and polished it a little bit. The result is a subclass of CountDownLatch called AbortableCountDownLatch, which adds an "abort()" method to the class that will cause all threads waiting on the latch to receive an AbortException (a subclass of InterruptedException).
Also, unlike JB's class, the AbortableCountDownLatch will abort all blocking threads immediately on an abort, rather than waiting for the countdown to reach zero (for situations where you use a count>1).
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AbortableCountDownLatch extends CountDownLatch {
protected boolean aborted = false;
public AbortableCountDownLatch(int count) {
super(count);
}
/**
* Unblocks all threads waiting on this latch and cause them to receive an
* AbortedException. If the latch has already counted all the way down,
* this method does nothing.
*/
public void abort() {
if( getCount()==0 )
return;
this.aborted = true;
while(getCount()>0)
countDown();
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
final boolean rtrn = super.await(timeout,unit);
if (aborted)
throw new AbortedException();
return rtrn;
}
@Override
public void await() throws InterruptedException {
super.await();
if (aborted)
throw new AbortedException();
}
public static class AbortedException extends InterruptedException {
public AbortedException() {
}
public AbortedException(String detailMessage) {
super(detailMessage);
}
}
}