Search code examples
javamultithreadingdeadlock

Java Producer Consumer model always deadlocking


So I had this assingment to make a Producer Consumer model for homework, and I finished working on an extremely crude version (but the best I could do with my current java skills).

It seems to work but it runs into the Deadlocking problem http://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem that the Wiki link describes, which is that basically for some reason eventually all Threads fall asleep and fail to wake each other entering eternal sleep cycles.

I'm not really sure what exactly in my code is causing this as I would've thought the way I wrote it this wouldn't occur, but then again I still don't 100% understand how Threads work.

Here's my code:

package boundedbuffer;

import java.util.LinkedList;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.Queue;


public class BoundedBuffer {


    public static int CapacityCheck = 0;


    public static void main(String[] args){


        MessageQueue queue = new MessageQueue(3); // <-- max capacity of queue is given here as 3

        Thread t1 = new Thread(new Producer(queue));
        Thread t2 = new Thread(new Producer(queue));
        Thread t3 = new Thread(new Producer(queue));  
        Thread t4 = new Thread(new Consumer(queue));
        Thread t5 = new Thread(new Consumer(queue));
        Thread t6 = new Thread(new Consumer(queue));
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        t6.start();

    }
}

public class Producer implements Runnable{

    private MessageQueue queue;

    private static String msgs[] = {
            "some test message",
            "long message",
            "short message",
            "yet another message"
        };

    public Producer(MessageQueue queue){
        this.queue = queue;
    }

    @Override
    public synchronized void run() {
        while(true){
            Random rand = new Random();
            int wait = rand.nextInt(3000);
            int index = rand.nextInt(4);
            try {
                Thread.sleep(wait);
            } catch (InterruptedException ex) {
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,
                null, ex);
            }         
            if(BoundedBuffer.CapacityCheck < queue.capacity){ 
                System.out.println("Puts into buffer: " + msgs[index]);
                queue.put(msgs[index]);
                BoundedBuffer.CapacityCheck++;
                notifyAll();
            }else{
                try {
                    wait();
                } catch (InterruptedException ex) {
                    Logger.getLogger(Producer.class.getName()).log(Level.SEVERE,                        null, ex);            
                }
            }
        }

    }

}

public class Consumer implements Runnable{

    private MessageQueue queue;

    public Consumer(MessageQueue queue){
        this.queue = queue;
    }

    @Override
    public synchronized void run() {
        while(true){
            Random rand = new Random();
            int wait = rand.nextInt(3000);
            try {
                Thread.sleep(wait); 
            } catch (InterruptedException ex) {
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            }
            String msg = queue.get();
            if(msg == null){
                try {
                    wait();
                } catch (InterruptedException ex) {
                    Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            queue.get();
            BoundedBuffer.CapacityCheck--;
            System.out.println("Takes out of buffer: " + msg);
            notifyAll();        
        }
    }


}


public class MessageQueue  {

    public final int capacity;
    private final Queue<String> messages = new LinkedList<>();


    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    public void put(String msg){
        this.messages.add(msg);
    }

    public String get(){ 
        if(messages.isEmpty()){ 
            return null;
        }else{
            String msg = messages.element();
            messages.remove();
            return msg;
        }
    }
}

Another minor but interesting problem is that I either NEVER or maybe only once saw a case where "taking an item out" happened more than once after each other. Putting items in always happens either once, twice, or up to three times after one another (I made the buffer size 3 for this example so it can't happen 4 times) but taking out an item happens only maybe ONCE and then afterwards it always puts one back, takes one out, puts one back. I've never seen after 3 items are put in: takes one out, takes one out again for example.

This might be a problem or an error. Idk.

I also think that using Synchronized on the run methods feels a bit off but if I take it out then I get an IllegalMonitorState Exception.

I'm using multiple producers and multiple consumers because that's how my teacher asked us to do it.


Solution

  • All your thread stall's because you are obtaining mutex on different producers and consumers that you pass to threads.

    You synchronize on run method meaning obtaining mutex on different object while calling wait method and entering in blocked state assuming someone would notify the thread to come back. Even though if other threads notify they notify on this (individual producers or consumers) instance rather than shared instance between producer and consumer.

    Share common instance like you are doing MessageQueue and synchronize on Queue rather than on run method.