I know that typically for producer/consumer pairs like this, a blocking queue should be used. What I want here is only to understand better memory consistency in Java, interaction between concurrent data structures and locks, and also what is exactly the imprecision when determining the size of ConcurrentLinkedQueue
.
The question is, does the algorithm below assure, that anything produced is consumed, as it would in the case of a plain non-thread safe queue? Note: I ran it several times and it always was the case.
import java.util.concurrent.ConcurrentLinkedQueue;
public class Produce extends Thread {
@Override
public void run() {
synchronized(Main.queue) {
Main.queue.add(1);
Main.queue.notifyAll();
}
}
}
public class Consume extends Thread {
@Override
public void run() {
synchronized(Main.queue) {
while(true) {
while(!Main.queue.isEmpty()) {
Main.queue.poll();
System.out.println("consumed");
}
System.out.println("empty");
try {
Main.queue.wait();
} catch(InterruptedException e) {
}
}
}
}
}
public class Main {
public static final ConcurrentLinkedQueue<Integer> queue =
new ConcurrentLinkedQueue();
public static void main(String[] args) {
(new Consume()).start();
(new Produce()).start();
}
}
The answer to your question is yes. The consumer will see all of the updates.
However:
This is not a sensible implementation. It looks like you are using the polling approach with wait / notify so that you don't need a busy loop to wait for the queue to become nonempty. But a better (simpler, more efficient) approach would be to use a BlockingQueue
instead and use the blocking get()
method.
For what it is worth, you are negating any possible scalability advantages of using ConcurrentLinkedQueue
by using the queue object as a mutex to do wait / notify signalling. (This would also apply if you used a different object as the mutex. The problem is the mutual exclusion!)
If you are going to do it this way (for whatever reason), a notify()
would be preferable to a notifyAll()
. Only one consumer is going to be able to consume that (single) element you added to the queue. Waking up all of the consumers is unnecessary.
It is not a good idea to extend Thread
. A better way is to put your business logic into a Runnable
(or a lambda) which you pass as a Thread
constructor parameter. Read: "implements Runnable" vs "extends Thread" in Java
You also were interested in:
... what is exactly the imprecision when determining the size of
ConcurrentLinkedQueue
.
The answer to that is in the javadoc for ConcurrentLinkedQueue
:
"Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal."
"Additionally, if elements are added or removed during execution of this method, the returned result may be inaccurate. Thus, this method is typically not very useful in concurrent applications."
In other words, ConcurrentLinkedQueue
counts the queue elements, and does not give an accurate answer if elements are being added and removed at the same time.