In the code below I have a question regarding what happens after I call wait(). In my code, I am returning a value after calling wait()
, what does this actually do? I thought that calling wait()
suspends the current thread, but what happens to the value i
passed to addWorkItem(Integer i)
if wait()
is called without returning false? You can see in the producer thread that it adds i
to a retry buffer if it couldn't be added to the deque. If I don't return false after wait, does the value i
just get lost, or is it still there once the thread wakes up?
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
public class ConsumerProducer2 {
private static int QUEUE_SIZE = 10;
private Deque<Integer> queue = new ArrayDeque<Integer>(QUEUE_SIZE);
public synchronized boolean addWorkItem(Integer i) {
while (queue.size() >= QUEUE_SIZE) {
try {
wait();
return false; // WHAT HAPPENS HERE?
} catch (InterruptedException ex) {}
}
queue.addLast(i);
notify();
return true;
}
public synchronized Integer getWork() {
while (queue.size() == 0) {
try {
wait();
return null; // WHAT HAPPENS HERE?
} catch (InterruptedException ex) {
}
}
Integer i = queue.removeFirst();
notify();
return i;
}
public static void main(String[] args) {
new ConsumerProducer2().go();
}
public void go() {
ConsumerThread ct = new ConsumerThread();
ct.start();
ConsumerThread ct2 = new ConsumerThread();
ct2.start();
ProducerThread pt = new ProducerThread();
pt.start();
}
class ConsumerThread extends Thread {
public void run() {
while(true) {
Integer work = getWork();
if (work == null) {
} else {
System.out.println("Thread: " + this.getId() + " received work: " + work);
}
}
}
}
class ProducerThread extends Thread {
private List<Integer> retryList = new ArrayList<Integer>();
public void run() {
while(true) {
Integer currWork;
if (retryList.size() == 0) {
currWork = (int) (Math.random() * 100);
} else {
currWork = retryList.remove(0);
System.out.println("Thread: " + this.getId() + " retrying old work: " + currWork);
}
if (!addWorkItem(currWork)) {
System.out.println("Thread: " + this.getId() + " could not add work (because buffer is probably full): " + currWork);
retryList.add(currWork);
} else {
System.out.println("Thread: " + this.getId() + " added work to queue: " + currWork);
}
}
}
}
}
Having the producer maintain a retry buffer does keep the i value from getting lost, but this still isn't a good way to write the method.
Returning from inside the while loop doesn't make sense. You check the size of the queue, and if it's maxed out you wait around until you get a notification that the size of the queue changed, then inexplicably return false (??). The waiting doesn't really accomplish anything.
The point of waiting in addWorkItem is to delay your thread until the queue has room for the new value. You should wait inside a loop, where when you come out of the wait, your thread reacquires the lock and re-checks the condition (queue size > max) to see if it can add the item yet.
Once the thread has exited from the while loop it is holding the lock, it is sure there's enough room in the queue for the new item (because no other threads can do anything to change the size of the queue while this thread has the lock held), and it can go ahead and add the value to the queue.
You are catching the InterruptedException in an unproductive way, because you catch it, don't bother to restore the interrupt flag, and go back to the top of the while loop. You should be using the interruption to quit waiting and get out of the method. Letting InterruptedException be thrown here would make more sense; the thread running the method should know better how to handle the interruption than this object does.
You shouldn't assume wait returns only when the thread is notified, it can return without a notification. That's one of the reasons to call wait in a loop.
Reworked version:
public synchronized boolean addWorkItem(Integer i) throws InterruptedException {
while (queue.size() >= QUEUE_SIZE) {
wait();
}
queue.addLast(i);
notify();
return true;
}
If you want an excuse to return false from this you could make the method return false if the queue doesn't make room for the new entry within some time frame (having a timeout can be a good thing in a lot of real-life situations):
public synchronized boolean addWorkItem(Integer i) throws InterruptedException {
final long maxWaitTime = 60L * 1000;
long totalWaitTime = 0;
while (queue.size() >= QUEUE_SIZE && totalWaitTime <= maxWaitTime) {
long waitStartTime = System.currentTimeMillis();
wait(maxWaitTime);
totalWaitTime += (System.currentTimeMillis() - waitStartTime);
}
if (queue.size() >= QUEUE_SIZE) {
return false;
}
queue.addLast(i);
notify();
return true;
}
This will still use the retry buffer (which the first version above it won't do at all), but probably not nearly as much as you are now.
Another thing: you have producer and consumer threads concurrently accessing this, and notify is called for both cases. Since notify only wakes up one thread, it's possible for a thread to get a notification that isn't relevant for it (so the notified thread wakes up, checks its condition and finds it still false, then waits some more, while another thread that the notification actually matters to never finds out about it). There are different ways to solve the problem, you can
assign separate locks, one for producers and one for consumers,
reduce the timeout passed into the wait method so you're less dependent on getting notified, or
you can use notifyAll (less performant but a quick fix).