As an answer to the question about pausing a BlockingQueue, I came with the idea of using an existing blocking structure blockingQueue2
and guarding the state with two different locks.
public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private Object lock1 = new Object();//used in pause() and in take()
private Object lock2 = new Object();//used in pause() and unpause()
//@GuardedBy("lock1")
private volatile boolean paused;
private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();
public void pause() {
if (!paused) {
synchronized (lock1) {
synchronized (lock2) {
if (!paused) {
paused = true;
blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
}
}
}
}
}
public void unpause() throws InterruptedException {
if (paused) {
synchronized (lock2) {
paused = false;
blockingQueue2.put(new Object());//will release waiting thread, if there is one
}
}
}
@Override
public E take() throws InterruptedException {
E result = super.take();
if (paused) {
synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
if (paused) {
blockingQueue2.take();
}
}
}
return result;
}
//TODO override similarly the poll() method.
}
I need two different locks, otherwise the unpause()
could wait for the lock1
held already in take()
by a consumer thread.
My questions:
paused
flag: with @GuardedBy("lock1, locks2")
?PS: Any improvements are welcome (beside that I could have used a binary semaphore instead of blockingQueue2
).
I will answer your questions one by one
Could this come to a deadlock?
No you will not result in a deadlock. If you obtained lock1
and lock2
in different orders than that could result in a deadlock. Since you obtain them in the same order when holding both you should be fine.
Does it work at all?
It appears to. All happens-before ordering seems to be satisfied.
How often did you see such code, as I myself don't find it readable?
I've never seen this sort of implementation before. I agree it isn't very elegant.
I am going to pose an alternative solution which uses a Phaser. One can argue this isn't any more elegant, just an alternate approach. I have reviewed it for some time and I think it is sufficient. Granted I've never seen this approach either, but was fun to think up.
public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 184661285402L;
private final Phaser phaser = new Phaser(1);
private volatile int phase = phaser.getPhase();
public BlockingQueueWithPause() {
// base case, all phase 0 await's will succeed through.
phaser.arrive();
}
public void pause() {
phase = phaser.getPhase();
}
public void unpause() throws InterruptedException {
phaser.arrive();
}
@Override
public E take() throws InterruptedException {
phaser.awaitAdvance(phase);
E result = super.take();
return result;
}
}
I guess I should explain this solution. The Phaser is like if the CylicBarrier and CountDownLatch had a child. It allows re-using the barrier while not waiting for the barrier to trip.
In the base case, the shared phase
will be 0. Since arrive
is called in the constructor the phaser
's internal phase is 1. So, if take
is invoked without a pause
ever being called awaitAdvance
will be called on 0. Since the internal phase is 1 the phaser fast-path's out and is a simple volatile load (the 0 phase already occurred so we no longer have to wait for advance).
If pause
is invoked, the shared phase
variable will be updated to the phaser's internal phase which is now 1. So a take
will awaitTermination
on 1 causing it to suspend.
The unpause
arrive will cause all threads awaitAdvance
ing to release and increment the phaser's internal phase to 2. Again, subsequent take's will fast-path out without a corresponding pause.