I am trying to solve the Producer/Consumer problem in Java using multi-threading, but I keep getting stuck in a deadlock and I can't figure out why.
BoundedBuffer.java
public class BoundedBuffer {
private final int[] buffer;
private final int N;
private int in = 0;
private int out = 0;
private int itemCount = 0;
public BoundedBuffer(int size) {
N = size + 1;
buffer = new int[N];
}
public void insert(Producer producer, int item) {
synchronized (producer) {
while ( (in + 1) % N == out) {
try {
producer.wait();
} catch (InterruptedException e) {}
}
buffer[in] = item;
in = (in + 1) % N;
itemCount++;
}
public int remove(Consumer consumer) {
synchronized (consumer) {
while (in == out) {
try {
consumer.wait();
} catch (InterruptedException e) {}
}
int item = buffer[out];
buffer[out] = null;
out = (out + 1) % N;
itemCount--;
return item;
}
}
}
Producer.java
public class Producer extends Thread {
private int total = 0;
private BoundedBuffer buffer;
private int uniqueItem = 0;
public Producer(int total, BoundedBuffer b) {
this.total = total;
this.buffer = b;
}
public void run() {
for (int i = 0; i < quota; i++) {
try {
Thread.sleep((int)(Math.random() * 100));
} catch (InterruptedException e) {}
buffer.insert(this, uniqueItem++);
this.notifyAll();
}
}
}
Consumer.java
public class Consumer extends Thread {
private int total = 0;
private BoundedBuffer buffer;
public Consumer(int total, BoundedBuffer b) {
this.total = total;
this.buffer = b;
}
public void run() {
for (int i = 0; i < total; i++) {
try {
Thread.sleep((int)(Math.random() * 100));
} catch (InterruptedException e) {}
buffer.remove(this);
this.notifyAll();
}
}
}
This code will run for a while and when I have debugging set ("This producer is trying to insert this item...") I see a nice pattern coming along, but every now and then I get this error:
Exception in thread "Thread-2" java.lang.IllegalMonitorStateException
at java.lang.Object.notifyAll(Native Method)
at Consumer.run(Consumer.java:19)
And then I have a deadlock occur some time after that.
I think my issue is due to the key I am synchronizing on, the producer
and consumer
instances. I'm not sure what else I could synchronize the method on, and that is what is stumping me most. I believe I could have a key for each consumer/producer to share, but I'm not sure how that would work.
You should synchronize on the BoundedBuffer
itself, and use wait
inside the synchronized blocks to release the lock and wait until the other thread notifies the sleeping one.
There's a complete example in this tutorial: https://docs.oracle.com/javase/tutorial/essential/concurrency/guardmeth.html
Also, please note that you can now use a BlockingQueue
to implement a Consumer-Producer patter in a much more simple way.
Here you have the documentation for the BlockingQueue
showing a much more simpler Consumer-Producer implementation:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html