Search code examples
javaproducer-consumer

producer - consume; how does the consumer stop?


So I have simulated my producer consumer problem and I have the code below. My question is this: how does the consumer stops if he's in constant while(true).

In the code below, I've added

                    if (queue.peek()==null)
                         Thread.currentThread().interrupt();

which works nicely in this example. But in my real world design, this doesn't work (sometimes it takes longer time to the producer to 'put' the data so the exception thrown in the consumer is incorrect. In general, I know I can put a 'poison' data such as Object is XYZ and I can check it in the consumer. But this poison makes the code really look bad. Wonder if anyone has a different approach.

public class ConsumerThread implements Runnable
{
 private BlockingQueue<Integer> queue;
 private String name;
 private boolean isFirstTimeConsuming = true;
 public ConsumerThread(String name, BlockingQueue<Integer> queue)
 {
    this.queue=queue;
    this.name=name;
 }

@Override
public void run()
{
    try
    {       
        while (true)
        {   
            if (isFirstTimeConsuming)
            {
                System.out.println(name+" is initilizing...");
                Thread.sleep(4000);
                isFirstTimeConsuming=false;
            }
            try{

                if (queue.peek()==null)
                    Thread.currentThread().interrupt();

                Integer data = queue.take();

                System.out.println(name+" consumed ------->"+data);
                Thread.sleep(70);    

            }catch(InterruptedException ie)
            {
                System.out.println("InterruptedException!!!!");
                break;
            }
        }

        System.out.println("Comsumer " + this.name + " finished its job; terminating.");

    }catch (InterruptedException e)
    {
        e.printStackTrace();
    } 
}

}


Solution

  • A: There is simply no guarantee that just because peek returns null, the producer has stopped producing. What if the producer simply got slowed down? Now, the consumer quits, and the producer keeps producing. So the 'peek' -> 'break' idea basically fails.

    B: Setting a 'done/run' flag from consumer and reading it in producer also fails, if:

    1. consumer checks the flag, finds it should keep running, then does a 'take'
    2. in meanwhile, producer was setting the flag to 'dont run'
    3. Now consumer blocks forever waiting for a ghost packet

    The opposite can also happen, and one packet gets left out un-consumed.

    Then to get around this, you will want to do additional synchronization with mutexes over and above the 'BlockingQueue'.

    C: I find 'Rosetta Code' to be fine source of deciding what is good practice, in situations like this:

    http://rosettacode.org/wiki/Synchronous_concurrency#Java

    The producer and consumer must agree upon an object (or an attribute in the object) that represents end of input. Then the producer sets that attribute in the last packet, and the consumer stops consuming it. i.e. what you referred to in your question as 'poison'.

    In the Rosetta Code example above, this 'object' is simply an empty String called 'EOF':

    final String EOF = new String();
    
    // Producer
    while ((line = br.readLine()) != null)
      queue.put(line);
    br.close();
    // signal end of input
    queue.put(EOF);
    
    // Consumer
    while (true)
      {
        try
          {
            String line = queue.take();
            // Reference equality
            if (line == EOF)
              break;
            System.out.println(line);
            linesWrote++;
          }
        catch (InterruptedException ie)
          {
          }
      }