Search code examples
javamultithreadingsynchronizationindexoutofboundsexceptionproducer-consumer

Custom Producer Consumer


I tried a Producer Consumer program.. Sometimes the output is correct, sometimes exception occurs in middle of output although the program runs to completion..

I get IndexOutOfBoundsException and I believe the reason being the following:- when the Q is empty, all the 3 consumer threads go into waiting state; when the producer adds an item and notifies all the waiting threads, after a consumer thread removes the item,the other consumer thread waking up will try to remove(when the Q is now empty) which causes this problem.. I know its a race condition but not able to figure out how to avoid it... Any ideas/suggestions are welcome.

Another query - Am not able to figure out a way to gracefully terminate this program.. As of now I have used System.exit(0) when the last item is produced.. Any other better ideas are welcome.

P.S

I don't want to use any java's API synchronized classes, I wanted try using wait()/notify() mechanism..

class Producer implements Runnable
{
    private Queue q;
    Producer(Queue q)
    {
        this.q = q;
    }
    @Override
    public void run() { 
        for(int i =0;i<50;i++)
            try {
                q.add(new Integer(i));
            } catch (InterruptedException e) {  
                e.printStackTrace();
            }   
    }
}
class Consumer extends Thread
{
    private Queue q;
    Consumer(Queue q)
    {
        this.q = q;
    }   
    public void run()
    {       
        try {
            while(true)
            {
            System.out.println(Thread.currentThread().getName()+"-"+ q.get());
            }
        } catch (InterruptedException e) {      
            e.printStackTrace();
        }
    }
}
public class Main 
{
    public static void main(String args[])
    {
    Queue q = new Queue();
    Producer runObj = new Producer(q);
    Thread producerObj = new Thread(runObj,"Producer");
    producerObj.start();    
    Consumer c1 = new Consumer(q);
    Consumer c2 = new Consumer(q);
    Consumer c3 = new Consumer(q);
    c1.setName("c1");
    c2.setName("c2");
    c3.setName("c3");
    c1.start();
    c2.start();
    c3.start(); 
    }
}

Queue Class:

public class Queue {

    private ArrayList<Integer> itemQ;
    int qSize = 5;

    Queue()
    {
        itemQ = new ArrayList<Integer>();
    }

    public synchronized void add(Integer item) throws InterruptedException
    {

        if(itemQ.size() == qSize)
        {
            System.out.println("Q is full");
            wait();
        }
        System.out.println(item);
        if(item.equals(new Integer(49)))
        {
            System.out.println("Out Of Stock");
            System.exit(0);

        }
        itemQ.add(item);
        notifyAll();
    }

    public synchronized Integer get() throws InterruptedException
    {
        if(itemQ.isEmpty())
        {
            wait();
        }   
        Integer item = itemQ.remove(0);     
        notify();       
        return item;
    }
}

Solution

  • You need to change the if tests in Queue.add and Queue.get to use loops instead. For example change Queue.get method's code to

    while (itemQ.isEmpty()) {
        wait();
    }
    Integer item = itemQ.remove(0);
    notify();
    return item;
    

    When you call wait you give up the lock, and once you reacquire it you need to test that the condition you tested for (before you let go of the lock) is not still true. When something gets added to the queue every consumer blocked on acquiring the lock to the queue gets notified and any one of them could get it. So in the interval between the time that a thread gets woken from a wait and the time that it reacquires the lock another consumer could sneak in and remove something from the queue (invalidating your assumption that the queue must not be empty).

    Also there are spurious wake-ups, where a thread can get a notification even though no application event caused it. That's another reason to check what the state is upon waking up.