I have been trying to implement the producer consumer pattern. If both producer and consumer are running indefinitely how should one try to stop them?
I have been trying to test the status of isInterrupted()
but the following code does not guarantee all threads to stop.
public class Monitor {
private int value;
private boolean readable = false;
public synchronized void setVal(int value) {
while (readable) {
try {
this.wait();
} catch (InterruptedException e) {
break;
}
}
if (!Thread.currentThread().isInterrupted()) {
this.readable = true;
this.value = value;
this.notifyAll();
}
}
public synchronized int getVal() {
while (!readable) {
try {
this.wait();
} catch (InterruptedException e) {
break;
}
}
if (!Thread.currentThread().isInterrupted()) {
this.readable = false;
this.notifyAll();
}
return this.value;
}
}
The producer class looks like this:
import java.util.Random;
public class Producer implements Runnable {
private Monitor monitor;
private Random r = new Random();
private String name;
public Producer(Monitor m) {monitor = m;}
public void run() {
name = Thread.currentThread().getName();
while (!Thread.currentThread().isInterrupted()) {
int value = r.nextInt(1000);
monitor.setVal(value);
System.out.println("PRODUCER: " + name + " set " + value);
}
System.out.println("PRODUCER: " + name + " interrupted");
}
}
The consumer
public class Consumer implements Runnable {
private Monitor monitor;
private String name;
public Consumer(Monitor m) {monitor = m;}
public void run() {
name = Thread.currentThread().getName();
while (!Thread.currentThread().isInterrupted()) {
int value = monitor.getVal();
System.out.println("CONSUMER: " + name + " got " + value);
}
System.out.println("CONSUMER: " + name + " interrupted");
}
}
And the main:
public class Main {
public static void main(String[] args) {
final int n = 2;
Monitor m = new Monitor();
Thread[] producers = new Thread[n];
Thread[] consumers = new Thread[n];
for (int i = 0; i < n; i++) {
producers[i] = new Thread(new Producer(m));
producers[i].start();
consumers[i] = new Thread(new Consumer(m));
consumers[i].start();
}
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {}
for (int i = 0; i < n; i++) {
producers[i].interrupt();
consumers[i].interrupt();
}
}
}
I get the following results
PRODUCER: Thread-0 set 917
CONSUMER: Thread-1 got 917
PRODUCER: Thread-2 set 901
PRODUCER: Thread-0 set 29
CONSUMER: Thread-3 interrupted
CONSUMER: Thread-1 got 29
CONSUMER: Thread-1 interrupted
PRODUCER: Thread-2 set 825
...program hangs
and
PRODUCER: Thread-0 set 663
CONSUMER: Thread-1 got 663
PRODUCER: Thread-0 set 129
CONSUMER: Thread-1 got 129
PRODUCER: Thread-2 set 93
PRODUCER: Thread-2 interrupterd
CONSUMER: Thread-3 interrupted
PRODUCER: Thread-0 set 189
PRODUCER: Thread-0 interrupterd
CONSUMER: Thread-1 got 129
...program hangs
etc...
There is clearly something wrong. Why am I not registering the calls the interrupt
on a consistent basis?
It is perhaps surprising that catching an InterruptedException
does not mean that the interrupted flag is set on the thread.
The interrupted flag and InterruptedException
are two completely separate ways of indicating that an interrupt has occurred:
InterruptedException
without first checking whether the thread has been interrupted.InterruptedException
.In order to correctly preserve the fact that the thread was interrupted (or, at least, that an InterruptedException
was caught), you should explicitly re-interrupt the thread in your catch block:
// ...
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
// ...
This sets the interrupted flag, so your isInterrupted()
check should now work.