I was going through internal implementation of LinkedBlockingQueue put(E e) and take() function.
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
**notEmpty.signal();**
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
**notFull.signal();**
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
In both methods didn't get why signal() on condition called after a comparison with capacity. If any one can explain it, would be highly appreciable.
Here is one situation I can think about using:
if (c > 1)
notEmpty.signal();
Suppose the queue is empty, and there are 3 threads, thread_1, thread_2, thread_3.
take()
, get blocked at notEmpty.await()
.take()
, get blocked at notEmpty.await()
.take()
, get blocked at notEmpty.await()
.Then, there come other 3 threads, thread_4, thread_5, thread_6.
put()
, add an element in the queue, and signal
thread_1.takeLock
, and try take the first element.put()
, add another element in the queue, and signal
thread_2.takeLock
, but thread_1 is holding the lock now, so it has to wait.InterruptedException
and terminated.c > 1
in code above occurs. If thread_1 do not call signal
, thread_3 can not wake up and take the second element.signal
, thread_3 wake up, and take the second element.