Search code examples
javamultithreadingproducer-consumer

Is my approach to implement Producer-Consumer problem correct?


I have implemented producer problem using wait/notify combination. Could someone please let me know if my understanding on producer consumer problem is correct or not and if my implementation is correct/optimized?

Now i'm thinking how to implement the same problem using ExecutorService and CountDownLatch, ReentrantLock, CyclicBarrier? Is there any way to do it? Meanwhile I will try to see if I can implement the problem solution using the latch.

import java.util.ArrayList;
import java.util.EmptyStackException;
import java.util.Random;

public class ProducerConsumerProblem {

    private Object syncher = new Object();
    private volatile ArrayList<Integer> sharedBuffer = new ArrayList<Integer>();

    public static void main(String[] args) {

        ProducerConsumerProblem object = new ProducerConsumerProblem();

        Thread producerThread = new Thread(() -> {
            object.produceData();
        },"Producer");
        Thread consumerThread = new Thread(() -> {
            object.consumeData();
        },"Consumer");
        producerThread.start();
        consumerThread.start();
    }

    public void produceData() {
        Random randomNumber = new Random();
        while(true) {
            synchronized (syncher) {
                if(sharedBuffer.size() == 1) {
                    try {
                        //System.out.println("Producer waiting...");
                        syncher.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Integer producedElem = randomNumber.nextInt(10);
                System.out.println("+++ Produced: "+producedElem);
                sharedBuffer.add(producedElem);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                syncher.notify();
            }
        }
    }

    public void consumeData() {
        while(true) {
            synchronized (syncher) {
                while(sharedBuffer.size() == 0) {
                    try {
                        //System.out.println("Consumer waiting...");
                        syncher.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Integer consumedElem = sharedBuffer.stream().findAny().orElseThrow(()-> new EmptyStackException());
                System.out.println("--- Consumed: "+consumedElem);
                sharedBuffer.remove(consumedElem);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                syncher.notify();
            }
        }
    }
}

Solution

  • import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class ProducerConsumerProblemUsingBlockingQueue {
    
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
    
        public static void main(String[] args) {
    
            ProducerConsumerProblemUsingBlockingQueue object = new ProducerConsumerProblemUsingBlockingQueue();
    
            Thread producerThread = new Thread(() -> {
                object.produceData(object.blockingQueue);
            },"Producer");
            Thread consumerThread = new Thread(() -> {
                object.consumeData(object.blockingQueue);
            },"Consumer");
            consumerThread.start();
            producerThread.start();
        }
    
        private void consumeData(BlockingQueue<Integer> blockingQueue) {
            for(int i = 0; i < 10; i++) {
                try {
                    System.out.println("Consumed: "+blockingQueue.take().intValue());
                    Thread.sleep(2000);
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void produceData(BlockingQueue<Integer> blockingQueue) {
            Random randomObject = new Random();
            for(int i = 0; i < 10; i++) {
                try {
                    int randomNumber = randomObject.nextInt(100);
                    System.out.println("Produced: "+randomNumber);
                    blockingQueue.put(randomNumber);
                    Thread.sleep(2000);
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }