Search code examples
javamultithreadingproducer-consumer

Producer Consumer problem one producer many consumers java


I'm struggling with Producer Consumer problem. The problem exists when I run one producer and more than one consumers. Only one consumer consumes buffer. As far as I read it can be problem with my buffer implementation. How can I solve this? If it's not a buffer what I do wrong?


class Producer extends Thread {
    private final Buffer _buf;
    private final int maxSize;
    private final String name;

    public Producer(Buffer _buf, int maxSize, String name) {
        super(name);
        this._buf = _buf;
        this.maxSize = maxSize;
        this.name = name;
    }

    @Override
    public void run() {
        synchronized (_buf) {
            for (; ; ) {
                while (_buf.isFull()) {
                    try {
                        System.out.println("Buffer is full, " + "Producer thread waiting for " + "consumer to take something from buffer");
                        _buf.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                Random random = new Random();
                int i = random.nextInt();
                System.out.println(this.name + " producing value " + i);
                _buf.put(i);
                _buf.notifyAll();
                try {
                    Thread.sleep(200);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
            }
        }
    }
}

class Consumer extends Thread {
    private final Buffer _buf;
    private final int maxSize;
    private final String name;


    public Consumer(Buffer _buf, int maxSize, String name) {
        super(name);
        this._buf = _buf;
        this.maxSize = maxSize;
        this.name = name;
    }

    @Override
    public void run() {
        synchronized (_buf) {
            for (; ; ) {
                while (_buf.isEmpty()) {
                    System.out.println("Buffer is empty," + "Consumer thread is waiting" + " for producer thread to put something in buffer");
                    try {
                        _buf.wait();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                System.out.println(this.name + ": consuming value " + _buf.get());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
                _buf.notifyAll();
            }
        }
    }
}


class Buffer {
    public synchronized void put(int i) {
        // check for queue overflow
        if (isFull()) {
            System.out.println("Overflow\nProgram Terminated");
            System.exit(1);
        }

        System.out.println("Inserting " + i);

        rear = (rear + 1) % capacity;
        arr[rear] = i;
        count++;
        notifyAll();

    }

    public synchronized int get() {
        if (isEmpty()) {
            System.out.println("Underflow\nProgram Terminated");
            System.exit(1);
        }
        int result = arr[front];

        System.out.println("Removing " + arr[front]);

        front = (front + 1) % capacity;
        count--;
        notifyAll();
        return result;
    }


    private final int[] arr;      // array to store queue elements
    private int front;      // front points to the front element in the queue
    private int rear;       // rear points to the last element in the queue
    private final int capacity;   // maximum capacity of the queue
    private int count;      // current size of the queue

    // Constructor to initialize a buffer queue
    Buffer(int size) {
        arr = new int[size];
        capacity = size;
        front = 0;
        rear = -1;
        count = 0;
    }

    public int size() {
        return count;
    }

    public Boolean isEmpty() {
        return (size() == 0);
    }

    public Boolean isFull() {
        return (size() == capacity);
    }
}

public class PKmain {

    public static void main(String[] args) {
        int maxSize = 100;
        Buffer buffer = new Buffer(10);

        Thread producer = new Producer(buffer, maxSize, "PRODUCER");
        Thread consumer1 = new Consumer(buffer, maxSize, "CONSUMER 1");
        Thread consumer2 = new Consumer(buffer, maxSize, "CONSUMER 2");
        Thread consumer3 = new Consumer(buffer, maxSize, "CONSUMER 3");
        Thread consumer4 = new Consumer(buffer, maxSize, "CONSUMER 4");
        Thread consumer5 = new Consumer(buffer, maxSize, "CONSUMER 5");
        Thread consumer6 = new Consumer(buffer, maxSize, "CONSUMER 6");

        producer.start();
        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();
        consumer5.start();
        consumer6.start();
    }
}

And here is the console output:

Inserting -1893944
PRODUCER producing value 1150242252
Inserting 1150242252
PRODUCER producing value 957139043
Inserting 957139043
PRODUCER producing value -806406909
Inserting -806406909
PRODUCER producing value 1701947892
Inserting 1701947892
PRODUCER producing value -174867893
Inserting -174867893
PRODUCER producing value 1272708996
Inserting 1272708996
PRODUCER producing value -1522880833
Inserting -1522880833
PRODUCER producing value -1041643777
Inserting -1041643777
PRODUCER producing value 1741137093
Inserting 1741137093
Buffer is full, Producer thread waiting for consumer to take something from buffer
Removing -1893944
CONSUMER 6: consuming value -1893944
Removing 1150242252
CONSUMER 6: consuming value 1150242252
Removing 957139043
CONSUMER 6: consuming value 957139043
Removing -806406909
CONSUMER 6: consuming value -806406909
Removing 1701947892
CONSUMER 6: consuming value 1701947892
Removing -174867893
CONSUMER 6: consuming value -174867893
Removing 1272708996
CONSUMER 6: consuming value 1272708996
Removing -1522880833
CONSUMER 6: consuming value -1522880833
Removing -1041643777
CONSUMER 6: consuming value -1041643777
Removing 1741137093
CONSUMER 6: consuming value 1741137093
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
Buffer is empty,Consumer thread is waiting for producer thread to put something in buffer
PRODUCER producing value -1656771306
Inserting -1656771306
PRODUCER producing value 146381233
Inserting 146381233
PRODUCER producing value -303301670
Inserting -303301670
...

Thanks!!!


Solution

  • The reason why your Consumers can't process the buffer in parallel is synchronized (_buf) section. So while the lock is acquired by single Consumer none of other Consumers can process. As a solution I suggest get rid of locking on Consumer and Producer side in respect of locking inside buffer operations (which is present in your code too). Code can look like this:

    Producer part:

        @Override
        public void run() {
            for (; ; ) {
    
                Random random = new Random();
                int i = random.nextInt();
                System.out.println(this.name + " producing value " + i);
                _buf.put(i);
                try {
                    Thread.sleep(200);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
            }
    
        }
    

    Consumer part:

        @Override
        public void run() {
    
            for (; ; ) {
                System.out.println(this.name + ": consuming value " + _buf.get());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException exception) {
                    exception.printStackTrace();
                }
            }
        }
    

    Buffer part:

         public synchronized void put(int i) {
            while (isFull()) {
                try {
                    wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
    
            rear = (rear + 1) % capacity;
            arr[rear] = i;
            count++;
            notifyAll();
        }
    
        public synchronized int get() {
            while (isEmpty()) {
                try {
                    wait();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            int result = arr[front];
    
            front = (front + 1) % capacity;
            count--;
            notifyAll();
            return result;
        }