Search code examples
javamultithreadingconcurrencyjava.util.concurrent

Guarding state with two locks


As an answer to the question about pausing a BlockingQueue, I came with the idea of using an existing blocking structure blockingQueue2 and guarding the state with two different locks.

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private Object lock1 = new Object();//used in pause() and in take()
    private Object lock2 = new Object();//used in pause() and unpause()

    //@GuardedBy("lock1")
    private volatile boolean paused;

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();

    public void pause() {
        if (!paused) {
            synchronized (lock1) {
            synchronized (lock2) {
                if (!paused) {
                    paused = true;
                    blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
                }
            }
            }
        }
    }

    public void unpause() throws InterruptedException {
        if (paused) {
            synchronized (lock2) {
                paused = false;
                blockingQueue2.put(new Object());//will release waiting thread, if there is one
            }
        }
    }

    @Override
    public E take() throws InterruptedException {
        E result = super.take();

        if (paused) {
            synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
                if (paused) {
                    blockingQueue2.take();
                }
            }
        }

        return result;
    }

    //TODO override similarly the poll() method.
}

I need two different locks, otherwise the unpause() could wait for the lock1 held already in take() by a consumer thread.

My questions:

  1. Could this come to a deadlock?
  2. Does it work at all?
  3. How often did you see such code, as I myself don't find it readable?
  4. How should I annotate the paused flag: with @GuardedBy("lock1, locks2")?

PS: Any improvements are welcome (beside that I could have used a binary semaphore instead of blockingQueue2).


Solution

  • I will answer your questions one by one

    Could this come to a deadlock?

    No you will not result in a deadlock. If you obtained lock1 and lock2 in different orders than that could result in a deadlock. Since you obtain them in the same order when holding both you should be fine.

    Does it work at all?

    It appears to. All happens-before ordering seems to be satisfied.

    How often did you see such code, as I myself don't find it readable?

    I've never seen this sort of implementation before. I agree it isn't very elegant.


    I am going to pose an alternative solution which uses a Phaser. One can argue this isn't any more elegant, just an alternate approach. I have reviewed it for some time and I think it is sufficient. Granted I've never seen this approach either, but was fun to think up.

    public static class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {
    
        private static final long serialVersionUID = 184661285402L;
    
        private final Phaser phaser = new Phaser(1);
        private volatile int phase = phaser.getPhase();
    
        public BlockingQueueWithPause() {
            // base case, all phase 0 await's will succeed through.
            phaser.arrive();
        }
    
        public void pause() {
            phase = phaser.getPhase();
        }
    
        public void unpause() throws InterruptedException {
            phaser.arrive();
        }
    
        @Override
        public E take() throws InterruptedException {
            phaser.awaitAdvance(phase);
    
            E result = super.take();
    
            return result;
        }
    }
    

    I guess I should explain this solution. The Phaser is like if the CylicBarrier and CountDownLatch had a child. It allows re-using the barrier while not waiting for the barrier to trip.

    In the base case, the shared phase will be 0. Since arrive is called in the constructor the phaser's internal phase is 1. So, if take is invoked without a pause ever being called awaitAdvance will be called on 0. Since the internal phase is 1 the phaser fast-path's out and is a simple volatile load (the 0 phase already occurred so we no longer have to wait for advance).

    If pause is invoked, the shared phase variable will be updated to the phaser's internal phase which is now 1. So a take will awaitTermination on 1 causing it to suspend.

    The unpause arrive will cause all threads awaitAdvanceing to release and increment the phaser's internal phase to 2. Again, subsequent take's will fast-path out without a corresponding pause.