I am new to Java Concurrency and trying to achieve/implement Single Producer[P1] and Multiple Consumer [C1,C2,C3].
The idea is producer [P1] puts in the value and consumers C1,C2,C3 all runs their task to read the value individually as put in by P1. Once C1,C2,C3 reads the values, P1 again puts a new data. Then C1,C2,C3 reads data and this loop goes on.
Wait Notify works fine for Single Producer Single Consumer, but in this case of Single Producer Multiple Consumer wait notify concept doesn't look to be good strategy. How should I approach this problem.
Thanks to @Ivan and @Andreas.
@Ivan - In his comment made me understand how Producer Consumer pattern behaves. @Andreas - In his Comment suggested the usage of Phaser. (I used Cyclic Barrier instead since my number of registered threads does not vary dynamically)
With both their comments sharing the below sample code. Please do suggest improvisation if there any or a better way to handle this.
Main Class
public static void main(String[] args)
{
SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
new Thread(new Producer(sharedSpace)).start();
Consumer consumerRunnable = new Consumer(sharedSpace);
new Thread(consumerRunnable).start();
CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
}
Producer
private SharedSpace sharedSpace;
public Producer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
@Override
public void run() {
for(int i=0;i<3;i++)
{
int value = (int) (Math.random()*30);
sharedSpace.addValue(value);
}
}
Queue Shared by Producer and Consumer
private BlockingQueue<Integer> queue;
public SharedSpace(BlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
public BlockingQueue<Integer> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void addValue(int value)
{
try {
queue.put(value);
System.out.println(System.nanoTime()+" Producer added value "+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getValue() throws InterruptedException
{
return queue.take();
}
Consumer
private SharedSpace sharedSpace;
private Integer value;
public Consumer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
public Integer getValue() {
return value;
}
public void setValue(Integer value) {
this.value = value;
}
@Override
public void run()
{
try {
setValue(sharedSpace.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
EndUser
CyclicBarrier barrier;
Consumer consumer;
public EndUser(CyclicBarrier barrier) {
super();
this.barrier = barrier;
}
public EndUser(CyclicBarrier barrier, Consumer consumer) {
super();
this.barrier = barrier;
this.consumer = consumer;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public CyclicBarrier getBarrier() {
return barrier;
}
public void setBarrier(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try
{
while(true)
{
System.out.println(consumer.getValue());
barrier.await();
}
}
catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
Output [Consumer doesn't read from Producer unless all EndUser has taken their Data]
Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0