I have implemented producer problem using wait/notify combination. Could someone please let me know if my understanding on producer consumer problem is correct or not and if my implementation is correct/optimized?
Now i'm thinking how to implement the same problem using ExecutorService
and CountDownLatch
, ReentrantLock
, CyclicBarrier
? Is there any way to do it? Meanwhile I will try to see if I can implement the problem solution using the latch.
import java.util.ArrayList;
import java.util.EmptyStackException;
import java.util.Random;
public class ProducerConsumerProblem {
private Object syncher = new Object();
private volatile ArrayList<Integer> sharedBuffer = new ArrayList<Integer>();
public static void main(String[] args) {
ProducerConsumerProblem object = new ProducerConsumerProblem();
Thread producerThread = new Thread(() -> {
object.produceData();
},"Producer");
Thread consumerThread = new Thread(() -> {
object.consumeData();
},"Consumer");
producerThread.start();
consumerThread.start();
}
public void produceData() {
Random randomNumber = new Random();
while(true) {
synchronized (syncher) {
if(sharedBuffer.size() == 1) {
try {
//System.out.println("Producer waiting...");
syncher.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer producedElem = randomNumber.nextInt(10);
System.out.println("+++ Produced: "+producedElem);
sharedBuffer.add(producedElem);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
syncher.notify();
}
}
}
public void consumeData() {
while(true) {
synchronized (syncher) {
while(sharedBuffer.size() == 0) {
try {
//System.out.println("Consumer waiting...");
syncher.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer consumedElem = sharedBuffer.stream().findAny().orElseThrow(()-> new EmptyStackException());
System.out.println("--- Consumed: "+consumedElem);
sharedBuffer.remove(consumedElem);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
syncher.notify();
}
}
}
}
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerProblemUsingBlockingQueue {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
public static void main(String[] args) {
ProducerConsumerProblemUsingBlockingQueue object = new ProducerConsumerProblemUsingBlockingQueue();
Thread producerThread = new Thread(() -> {
object.produceData(object.blockingQueue);
},"Producer");
Thread consumerThread = new Thread(() -> {
object.consumeData(object.blockingQueue);
},"Consumer");
consumerThread.start();
producerThread.start();
}
private void consumeData(BlockingQueue<Integer> blockingQueue) {
for(int i = 0; i < 10; i++) {
try {
System.out.println("Consumed: "+blockingQueue.take().intValue());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void produceData(BlockingQueue<Integer> blockingQueue) {
Random randomObject = new Random();
for(int i = 0; i < 10; i++) {
try {
int randomNumber = randomObject.nextInt(100);
System.out.println("Produced: "+randomNumber);
blockingQueue.put(randomNumber);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}