Search code examples
javamultithreadinginterruptproducer-consumerinterrupted-exception

stopping threads that run indefinitely


I have been trying to implement the producer consumer pattern. If both producer and consumer are running indefinitely how should one try to stop them?

I have been trying to test the status of isInterrupted() but the following code does not guarantee all threads to stop.

public class Monitor {

    private int value;
    private boolean readable = false;

    public synchronized void setVal(int value) {
        while (readable) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                break;
            }
        }

        if (!Thread.currentThread().isInterrupted()) {
            this.readable = true;
            this.value = value;
            this.notifyAll();
        }
    }

    public synchronized int getVal() {
        while (!readable) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                break;
            }
        }
        if (!Thread.currentThread().isInterrupted()) {
            this.readable = false;
            this.notifyAll();
        }
        return this.value;
    }
}

The producer class looks like this:

import java.util.Random;

public class Producer implements Runnable {

    private Monitor monitor;
    private Random r = new Random();
    private String name;

    public Producer(Monitor m) {monitor = m;}

    public void run() {
        name = Thread.currentThread().getName();
        while (!Thread.currentThread().isInterrupted()) {
            int value = r.nextInt(1000);
            monitor.setVal(value);
            System.out.println("PRODUCER: " + name + " set " + value);
        }
        System.out.println("PRODUCER: " + name + " interrupted");
    }
}

The consumer

public class Consumer implements Runnable {

    private Monitor monitor;
    private String name;

    public Consumer(Monitor m) {monitor = m;}

    public void run() {
        name = Thread.currentThread().getName();
        while (!Thread.currentThread().isInterrupted()) {
            int value = monitor.getVal();
            System.out.println("CONSUMER: " + name + " got " + value);
        }
        System.out.println("CONSUMER: " + name + " interrupted");
    }
}

And the main:

public class Main {

    public static void main(String[] args) {

        final int n = 2;
        Monitor m = new Monitor();

        Thread[] producers = new Thread[n];
        Thread[] consumers = new Thread[n];

        for (int i = 0; i < n; i++) {
            producers[i] = new Thread(new Producer(m));
            producers[i].start();
            consumers[i] = new Thread(new Consumer(m));
            consumers[i].start();
        }

//        try {
//            Thread.sleep(1);
//        } catch (InterruptedException e) {}

        for (int i = 0; i < n; i++) {
            producers[i].interrupt();
            consumers[i].interrupt();
        }
    }
}

I get the following results

PRODUCER: Thread-0 set 917
CONSUMER: Thread-1 got 917
PRODUCER: Thread-2 set 901
PRODUCER: Thread-0 set 29
CONSUMER: Thread-3 interrupted
CONSUMER: Thread-1 got 29
CONSUMER: Thread-1 interrupted
PRODUCER: Thread-2 set 825
...program hangs

and

PRODUCER: Thread-0 set 663
CONSUMER: Thread-1 got 663
PRODUCER: Thread-0 set 129
CONSUMER: Thread-1 got 129
PRODUCER: Thread-2 set 93
PRODUCER: Thread-2 interrupterd
CONSUMER: Thread-3 interrupted
PRODUCER: Thread-0 set 189
PRODUCER: Thread-0 interrupterd
CONSUMER: Thread-1 got 129
...program hangs

etc...

There is clearly something wrong. Why am I not registering the calls the interrupt on a consistent basis?


Solution

  • It is perhaps surprising that catching an InterruptedException does not mean that the interrupted flag is set on the thread.

    The interrupted flag and InterruptedException are two completely separate ways of indicating that an interrupt has occurred:

    • You can throw an InterruptedException without first checking whether the thread has been interrupted.
    • You can set the interrupted flag without catching an InterruptedException.

    In order to correctly preserve the fact that the thread was interrupted (or, at least, that an InterruptedException was caught), you should explicitly re-interrupt the thread in your catch block:

    //  ...
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      break;
    }
    // ...
    

    This sets the interrupted flag, so your isInterrupted() check should now work.