Search code examples
javamultithreadingconcurrencywaitnotify

Single Producer Multiple Consumer Java


I am new to Java Concurrency and trying to achieve/implement Single Producer[P1] and Multiple Consumer [C1,C2,C3].

The idea is producer [P1] puts in the value and consumers C1,C2,C3 all runs their task to read the value individually as put in by P1. Once C1,C2,C3 reads the values, P1 again puts a new data. Then C1,C2,C3 reads data and this loop goes on.

Wait Notify works fine for Single Producer Single Consumer, but in this case of Single Producer Multiple Consumer wait notify concept doesn't look to be good strategy. How should I approach this problem.


Solution

  • Thanks to @Ivan and @Andreas.

    @Ivan - In his comment made me understand how Producer Consumer pattern behaves. @Andreas - In his Comment suggested the usage of Phaser. (I used Cyclic Barrier instead since my number of registered threads does not vary dynamically)

    With both their comments sharing the below sample code. Please do suggest improvisation if there any or a better way to handle this.

    Main Class

        public static void main(String[] args)
        {
            SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
            new Thread(new Producer(sharedSpace)).start();
    
    
            Consumer consumerRunnable = new Consumer(sharedSpace);
            new Thread(consumerRunnable).start();
    
            CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);
    
            new Thread(new EndUser(barrier,consumerRunnable)).start();
            new Thread(new EndUser(barrier,consumerRunnable)).start();
            new Thread(new EndUser(barrier,consumerRunnable)).start();
        }
    

    Producer

    private SharedSpace sharedSpace;
    
    public Producer(SharedSpace sharedSpace) {
        super();
        this.sharedSpace = sharedSpace;
    }
    
    public SharedSpace getSharedSpace() {
        return sharedSpace;
    }
    
    public void setSharedSpace(SharedSpace sharedSpace) {
        this.sharedSpace = sharedSpace;
    }
    
    @Override
    public void run() {
    
        for(int i=0;i<3;i++)
        {
            int value = (int) (Math.random()*30);
            sharedSpace.addValue(value);
        }
    
    
    }
    

    Queue Shared by Producer and Consumer

    private BlockingQueue<Integer> queue;
    
        public SharedSpace(BlockingQueue<Integer> queue) {
            super();
            this.queue = queue;
        }
    
        public BlockingQueue<Integer> getQueue() {
            return queue;
        }
    
        public void setQueue(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }
    
        public void addValue(int value)
        {
            try {
                queue.put(value);
                System.out.println(System.nanoTime()+" Producer added value "+value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public int getValue() throws InterruptedException
        {
                return queue.take();
    
    
        }
    

    Consumer

    private SharedSpace sharedSpace;
    
        private Integer value;
    
        public Consumer(SharedSpace sharedSpace) {
            super();
            this.sharedSpace = sharedSpace;
        }
    
        public SharedSpace getSharedSpace() {
            return sharedSpace;
        }
    
        public void setSharedSpace(SharedSpace sharedSpace) {
            this.sharedSpace = sharedSpace;
        }
    
        public Integer getValue() {
            return value;
        }
    
        public void setValue(Integer value) {
            this.value = value;
        }
    
        @Override
        public void run() 
        {
    
            try {
                setValue(sharedSpace.getValue());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    

    EndUser

    CyclicBarrier barrier;
    
    Consumer consumer;
    
    public EndUser(CyclicBarrier barrier) {
        super();
        this.barrier = barrier;
    }
    
    public EndUser(CyclicBarrier barrier, Consumer consumer) {
        super();
        this.barrier = barrier;
        this.consumer = consumer;
    }
    
    
    public Consumer getConsumer() {
        return consumer;
    }
    
    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }
    
    
    public CyclicBarrier getBarrier() {
        return barrier;
    }
    
    
    public void setBarrier(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
    
    
    @Override
    public void run() {
        try
        {
            while(true)
            {
                System.out.println(consumer.getValue());
                barrier.await();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    
    }
    

    Output [Consumer doesn't read from Producer unless all EndUser has taken their Data]

    Producer added value 24
    Producer added value 10
    24
    24
    24
    10
    10
    Producer added value 0
    10
    0
    0
    0