Search code examples
javamultithreadingtimesynchronizationproducer-consumer

java producer consumer issue-make thread wait until others have finished before completing


I am currently doing a producer consumer problem with multiple threads. There are 1000 bytes available at first and 500 have been taken up by using the RAM nad drivers, leaving me to work with 500 for the threads. There are to be 4 producers, as follows:

  1. A thread to start a BubbleWitch2 session of 10 seconds, which requires 100 bytes of RAM per second
  2. A thread to start a Spotify stream of 20 seconds, which requires 250 bytes of RAM per second.
  3. System and management threads, which, together, require 50 bytes of RAM per second, and execute for a random length of time, once invoked.
  4. A thread to install a new security update of 2 KB, which will be stored to disk, and requires 150 bytes of RAM per second while installing. Assume sufficient disk capacity in the system to support this thread.

The program is meant to cease execution when the security update has finished. Ideally, this should be achieved without setting priorities on the threads. It was working earlier but now when I run the program, the security thread is finishing in the middle and spotify is finishing laugh. Are there any mistakes that may be causing this? I have included my code below. I have yet to assign all of the byte sizes to the threads and buffer.

My main method.

/**
 * Created by User on 10/08/2014.
 */
public class ProducerConsumerTest {
        public static void main(String[] args) throws InterruptedException {
            Buffer c = new Buffer();
            BubbleWitch2 p1 = new BubbleWitch2(c, 1);
            Processor c1 = new Processor(c, 2);
            Spotify p2 = new Spotify(c, 3);
            SystemManagement p3 = new SystemManagement(c,4);
            securityUpdate p4 = new securityUpdate(c,5, p1,p2,p3);

            p1.setName("BubbleWitch2 ");
            p2.setName("Spotify ");
            p3.setName("System Management ");
            p4.setName("Security Update ");

            c1.start();
            p1.start();
            p2.start();
            p3.start();
            p4.start();

            p2.join();
            p3.join();
            p4.join();
            System.exit(0);

        }
    }

My buffer/cubbyhole class

/**
 * Created by User on 10/08/2014.
 */
class Buffer {
    private int contents;
    private boolean available = false;
    public synchronized int get() {
        while (available == false) {
            try {
                wait();
            }
            catch (InterruptedException e) {
            }
        }
        available = false;
        notifyAll();
        return contents;
    }
    public synchronized void put(int value) {
        while (available == true) {
            try {
                wait();
            }
            catch (InterruptedException e) {
            }
        }
        contents = value;
        available = true;
        notifyAll();
    }
}

My Consumer class

class Processor extends Thread {
    private Buffer cubbyhole;
    private int number;
    public Processor(Buffer c, int number) {
        cubbyhole = c;
        this.number = number;
    }
    public void run() {
        int value = 0;
        for (int i = 0; i < 60; i++) {
            value = cubbyhole.get();
            System.out.println("Processor #"
                    + this.number
                    + " got: " + value);
        }
    }
}

My spotify producer class

class Spotify extends Thread {
    private Buffer buffer;
    private int number;
    private int bytes;

    public Spotify(Buffer c, int number) {
        buffer = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 20; i++) {
            buffer.put(i);
            System.out.println(getName() + this.number
                    + " put: " + i);
            try {
                sleep(1000);
            } catch (InterruptedException e) { }
        }
        System.out.println("*****************************");
        System.out.println("Spotify has finished executing.");
        System.out.println("*****************************");

    }
}

My bubblewitch producer class

import java.lang.*;
import java.lang.System;
/**
 * Created by User on 10/08/2014.
 */
class BubbleWitch2 extends Thread {
    private Buffer buffer;
    private int number;
    private int bytes;

    public BubbleWitch2(Buffer c, int number) {
        buffer = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            buffer.put(i);
            System.out.println(getName() + this.number
                    + " put: " + i);
            try {
                sleep(1000);
            } catch (InterruptedException e) { }
        }
        System.out.println("*****************************");
        System.out.println("BubbleWitch2 has finished executing.");
        System.out.println("*****************************");
    }
}

My system Management producer class

  class SystemManagement extends Thread {
        private Buffer buffer;
        private int number, min = 1, max = 15;
        private int loopCount = (int) (Math.random() * ( max - min ));

        public SystemManagement(Buffer c, int number) {
            buffer = c;
            this.number = number;
        }

        public void run() {
            for (int i = 0; i < loopCount; i++) {
                buffer.put(i);
                System.out.println(getName() + this.number
                        + " put: " + i);
                try {
                    sleep(1000);
                } catch (InterruptedException e) { }
            }
            System.out.println("*****************************");
            System.out.println("System Management has finished executing.");
            System.out.println("*****************************");
        }
    }

My security update class

/**
 * Created by User on 14/08/2014.
 */
import java.lang.*;
import java.lang.System;

/**
 * Created by User on 11/08/2014.
 */
class securityUpdate extends Thread {
    private Buffer buffer;
    private int number;
    private int bytes = 150;
    private int process = 0;

    public securityUpdate (Buffer c, int number, BubbleWitch2 bubbleWitch2, Spotify spotify, SystemManagement systemManagement) throws InterruptedException {
        buffer = c;
        this.number = number;
        bubbleWitch2.join();
        spotify.join();
        systemManagement.join();
    }

    public void run() {

        for (int i = 0; i < 10; i++) {
            buffer.put(i);
            System.out.println(getName() + this.number
                    + " put: " + i);
            try {
                sleep(1000);
            } catch (InterruptedException e) { }
        }
        System.out.println("*****************************");
        System.out.println("Security Update has finished executing.");
        System.out.println("*****************************");
    }
}

I want to be able to make it run last without hardcoding a different number in the count as I will be required to calculate in code how long it takes to run at a size of 2000 bytes at 150 bytes per second, which would make hardcoding irrelevant. Has anyone any ideas?


Solution

  • Try this below modified code of yours using executor framework which is simpler:-

    public class ProducerConsumerTest {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<Integer> c = new ArrayBlockingQueue<Integer>(1);
            CountDownLatch doneSignal = new CountDownLatch(3);
    
            Processor c1 = new Processor(c, 2, doneSignal);
    
            BubbleWitch2 p1 = new BubbleWitch2(c, 1, doneSignal);        
            Spotify p2 = new Spotify(c, 3, doneSignal);
            SystemManagement p3 = new SystemManagement(c,4, doneSignal);
            SecurityUpdate p4 = new SecurityUpdate(c,5, doneSignal);
    
            p1.setName("BubbleWitch2 ");
            p2.setName("Spotify ");
            p3.setName("System Management ");
            p4.setName("Security Update ");
    
            ExecutorService exec = Executors.newCachedThreadPool();
            exec.submit(c1);
            exec.submit(p1);
            exec.submit(p2);
            exec.submit(p3);        
    
            Future<?> securityFuture = exec.submit(p4);
    
            try {
                while(securityFuture.get()!=null) {
    
                }           
                exec.shutdown();
                while(exec.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
    
                }
                exec.shutdownNow();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
            System.exit(0);
        }
    }
    class Processor extends Thread {
        private BlockingQueue<Integer> cubbyhole;
        private int number;
        private CountDownLatch doneSignal;
    
        public Processor(BlockingQueue<Integer> c, int number,CountDownLatch doneSignal) {
            cubbyhole = c;
            this.number = number;
            this.doneSignal = doneSignal;
        }
        public void run() {
            int value = 0;
           // for (int i = 0; i < 60; i++) {
            while(true) {
                try {
                    value = cubbyhole.take();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    //e.printStackTrace();
                }
                System.out.println("Processor #"
                        + this.number
                        + " got: " + value);
            }
            //doneSignal.countDown();
        }
    }
    
    class Spotify extends Thread {
        private BlockingQueue<Integer> buffer;
        private int number;
        private int bytes;
        private CountDownLatch doneSignal;
    
        public Spotify(BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
            buffer = c;
            this.number = number;
            this.doneSignal = doneSignal;
        }
    
        public void run() {
            for (int i = 0; i < 20; i++) {
                try {
                    buffer.put(i);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                System.out.println(getName() + this.number
                        + " put: " + i);           
            }
            System.out.println("*****************************");
            System.out.println("Spotify has finished executing.");
            System.out.println("*****************************");
            doneSignal.countDown();
        }
    }
    
    class BubbleWitch2 extends Thread {
        private BlockingQueue<Integer> buffer;
        private int number;
        private int bytes;
        private CountDownLatch doneSignal;
    
        public BubbleWitch2(BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
            buffer = c;
            this.number = number;
            this.doneSignal = doneSignal;
        }
    
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    buffer.put(i);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                System.out.println(getName() + this.number
                        + " put: " + i);
    
            }
            System.out.println("*****************************");
            System.out.println("BubbleWitch2 has finished executing.");
            System.out.println("*****************************");
            doneSignal.countDown();
        }
    }
    
    class SystemManagement extends Thread {
        private BlockingQueue<Integer> buffer;
        private int number, min = 1, max = 15;
        private int loopCount = (int) (Math.random() * ( max - min ));
        private CountDownLatch doneSignal;
    
        public SystemManagement(BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
            buffer = c;
            this.number = number;
            this.doneSignal = doneSignal;
        }
    
        public void run() {
            for (int i = 0; i < loopCount; i++) {
                try {
                    buffer.put(i);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                System.out.println(getName() + this.number
                        + " put: " + i);
    
            }
            System.out.println("*****************************");
            System.out.println("System Management has finished executing.");
            System.out.println("*****************************");
            doneSignal.countDown();
        }
    }
    
    class SecurityUpdate extends Thread {
        private BlockingQueue<Integer> buffer;
        private int number;
        private int bytes = 150;
        private int process = 0;
        private CountDownLatch doneSignal;
    
        public SecurityUpdate (BlockingQueue<Integer> c, int number, CountDownLatch doneSignal) {
            buffer = c;
            this.number = number;
            this.doneSignal = doneSignal;
        }
    
        public void run() {
            try {
                doneSignal.await();         
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            for (int i = 0; i < 10; i++) {
                try {
                    buffer.put(i);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                System.out.println(getName() + this.number
                        + " put: " + i);
    
            }
            System.out.println("*****************************");
            System.out.println("Security Update has finished executing.");
            System.out.println("*****************************");
        }
    }
    

    Do let me know if you have any questions