Search code examples
javamultithreadingalgorithmproducer-consumercritical-section

Java Threads producer-consumer shared buffer


Implement producer consumer problem using threads in Java. Producers and consumers share a buffer, producers put elements in buffer and consumers consume elements from the shared buffer. If the buffer is full producers should wait till consumers take out elements, similarly consumers should wait till producers put elements if the buffer is empty. Your program should accept the following inputs:

m: the number of producer threads
n: the number of consumer threads
k: the size of the bounded buffer

Your code should prompt for the above inputs in that order. You can assume that a valid integer is provided by the user for each of these. You will need to spawn m Producer threads and n Consumer threads. Each producer generates 20 integers ranging between 0 and 9 and puts them in the buffer. After putting a number in the buffer, it prints its id along with the generated number. The producer sleeps for a random amount of time before repeating the number generating cycle again. Each consumer takes a number from the buffer then prints its id along with the value it got. Then, it sleeps for a random amount of time before reading from the buffer again. A sample output of the program is:

Producer #2 put: 1
Producer #1 put: 4
Consumer #3 got: 1
Producer #1 put: 3
Consumer #3 got: 4
Consumer #3 got: 3
...

i have this problem. it is clear that the array of buffer is a global variable for two method because of that array is shared with Producer & Consumer. so? Unfortunately i have no idea how to do this project. anybody have an idea?


Solution

  • import java.security.SecureRandom;
    import java.util.concurrent.*;
    
    /**
     * Created by Leon.H on 2016/1/13.
     */
    public class ProducerConsumer {
        private int producerNumber = 0;
        private int consumerNumber = 0;
        private int bufferSize = 0;
        private final int seconds;
    
        public ProducerConsumer(int producerNumber, int consumerNumber, int bufferSize, int seconds) {
            this.producerNumber = producerNumber;
            this.consumerNumber = consumerNumber;
            this.bufferSize = bufferSize;
            this.seconds = seconds;
            System.out.println(this.producerNumber+ ": the number of producer threads");
            System.out.println(this.consumerNumber+ ": the number of consumer threads");
            System.out.println(this.bufferSize+ ": the number of producer threads");
        }
    
        public void process() throws InterruptedException {
            ExecutorService producerExecutorService = Executors.newFixedThreadPool(this.producerNumber);
            ExecutorService consumerExecutorService = Executors.newFixedThreadPool(this.consumerNumber);
            BlockingQueue<Integer> integers = new ArrayBlockingQueue<Integer>(this.bufferSize);
    
            for (int i = 0; i < this.producerNumber; i++) {
                producerExecutorService.execute(new ProducerTask(integers));
            }
    
            for (int i = 0; i < this.consumerNumber; i++) {
                consumerExecutorService.execute(new ConsumerTask(integers));
            }
    
            producerExecutorService.shutdown();
            consumerExecutorService.shutdown();
    
            //let the program run 10 seconds
            producerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
            consumerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
        }
    
        private class ProducerTask implements Runnable {
            private final BlockingQueue<Integer> integers;
    
            public ProducerTask(BlockingQueue<Integer> integers) {
                this.integers = integers;
            }
    
            public void run() {
                while (true) {
                    Integer content = new SecureRandom().nextInt(1000);
                    System.out.println("Producer #" + Thread.currentThread().getId() + " put: " + content);
                    integers.offer(content);
                    try {
                        Thread.sleep(new SecureRandom().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private class ConsumerTask implements Runnable {
            private final BlockingQueue<Integer> integers;
    
            public ConsumerTask(BlockingQueue<Integer> integers) {
                this.integers = integers;
            }
    
            public void run() {
                while (true) {
                    try {
                        System.out.println("Consumer #" + Thread.currentThread().getId() + " get: " + integers.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        Thread.sleep(new SecureRandom().nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    }
    
    import org.junit.Test;
    
    /**
     * Created by Leon.H on 2016/1/13.
     */
    public class ProducerConsumerTest {
        @Test
        public void oneProducerOneConsumerSizeOne() throws InterruptedException {
            int ProducerNumber = 1;
            int ConsumerNumber = 1;
            int size = 1;
            int seconds=5;
            ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
            producerConsumer.process();
        }
    
        @Test
        public void twoProducerThreeConsumerSizeThree() throws InterruptedException {
            int ProducerNumber = 2;
            int ConsumerNumber = 3;
            int size = 3;
            int seconds = 5;
            ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
            producerConsumer.process();
        }
    
        @Test
        public void twoHundredProducerThreeConsumerSizeThree() throws InterruptedException {
            int ProducerNumber = 20;
            int ConsumerNumber = 3;
            int size = 3;
            int seconds=5;
            ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
            producerConsumer.process();
        }
    }