Implementing the monitor example for multithreading programming in java I managed to write this:
import java.util.*;
import java.util.concurrent.*;
class Monitor{
Semaphore s_prod; // Production semaphore
Semaphore s_rec; // Recolection semaphore
int capacidad; // capacity of the queue
ArrayList<Integer> cola; // my queue
Monitor(int n){
this.capacidad = n;
this.cola = new ArrayList<Integer>(this.capacidad);
this.s_prod = new Semaphore(this.capacidad);
this.s_rec = new Semaphore(0);
}
public void Producir(int n){ // Producing
try {
this.s_prod.acquire();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
this.cola.add(n);
System.out.println("|Prod:" + n + " | " + cola); // Printing production
this.s_rec.release();
}
public int Recolectar(){ // Recolecting
try {
this.s_rec.acquire();
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
int temp = this.cola.get(0);
this.cola.remove(0);
System.out.println("|Rec:" + temp + " | " + cola); // Printing recolection
this.s_prod.release();
return temp;
}
}
class Productor implements Runnable{ // Productor
Thread t;
Monitor m;
int a, b; // Produces number from 'a' to 'b'
boolean finish; // flag that indicates the end of production
Productor(String nombre, Monitor m, int i, int j){
this.t = new Thread(this, "Productor " + nombre);
this.a = i;
this.b = j;
this.m = m;
finish = false;
}
public void run(){
for(int i = a; i <= b; i++){
m.Producir(i);
try{
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
this.finish = true;
}
public void empezar(){
this.t.start();
}
}
class Recolector implements Runnable{ // Consumer
Thread t;
Monitor m;
Recolector(String nombre, Monitor m){
this.t = new Thread(this, "Recolector " + nombre);
this.m = m;
}
public void run(){
/*
while all the producers are still working and, even if they finished, the queue
needs to be emptied
*/
while(!(Checking.product_finished && m.cola.size() == 0)){
m.Recolectar();
try{
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
public void empezar(){
this.t.start();
}
}
class Checking implements Runnable{ // Threads that checks if the producers finished
public volatile static boolean product_finished;
Productor[] p_array;
Checking(Productor[] prods){
p_array = prods;
new Thread(this).start();
}
public void run(){
boolean flag = true;
while(flag){
flag = false;
for(Productor p:p_array){
if(!p.finish){
flag = true;
}
}
}
System.out.println("Produccion finalizada!");
product_finished = true;
}
}
public class Test{
public static void main(String args[]){
Monitor m = new Monitor(3); // monitor for queue of capacity=3
Productor p1 = new Productor("P1", m, 10, 20);
Productor p2 = new Productor("P2", m, -50, -40);
Recolector r1 = new Recolector("R1", m);
Recolector r2 = new Recolector("R1", m);
Recolector r3 = new Recolector("R1", m);
Productor[] p_array = new Productor[2];
p_array[0] = p1;
p_array[1] = p2;
Checking ch = new Checking(p_array);
p1.empezar();
p2.empezar();
r1.empezar();
r2.empezar();
r3.empezar();
}
}
When running it the output shows the action performed and the queue status, but there are some problems, like ignoring the maximum length of the queue, producing two elements at the same time and deleting the first one by accident, or going out of bounds of the queue. I assume that a "synchronized" statement is needed but I cant make it work without blocking the whole program.
Also, in the "Checker" class, I use a thread that constatly checks if the producers finished their work, but I'd like to know if there is a better way to check that without overheating my PC.
The output for this values in the main function should only produce up to three numbers and wait for a free spot to produce new ones
I hope my answer will help you.
First of all, your code is ignoring the maximum length of the queue because you are using ArrayList
, and its constructor parameter is initialCapacity, not the maximum capacity. The ArrayList
will be resized once its size reaches initialCapacity
. I suggest you to use LinkedBlockingQueue which has a maximum fixed capacity. This type of queue is also very suitable for your task because all producers will wait until there is free space in the queue and all consumers will wait until there is available elements. So you don't need semaphores.
To check that all producers have finished their work you could use CompletableFuture which provides many helpful methods.
The complete code would look like this:
public class Producer implements Runnable {
private BlockingQueue<Integer> queue;
private int a, b;
public Producer(BlockingQueue<Integer> queue, int a, int b) {
this.queue = queue;
this.a = a;
this.b = b;
}
@Override
public void run() {
for (int i = a; i <= b; i++){
try {
//producer will wait here if there is no space in the queue
queue.put(i);
System.out.println("Put: " + i);
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
}
public class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public boolean finish = false;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (!finish || queue.size() > 0) {
// consumer will wait here if the queue is empty;
// we have to poll with timeout because several consumers may pass
// here while queue size is less than number of consumers;
// timeout should be at least equal to producing interval
Integer temp = queue.poll(3, TimeUnit.SECONDS);
if (temp != null) {
System.out.println("Took: " + temp);
Thread.sleep(2000);
}
}
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
And to test it:
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); //queue of capacity = 3
Producer p1 = new Producer(queue, 10, 20);
Producer p2 = new Producer(queue, -50, -40);
List<Consumer> consumers = new ArrayList<>();
CompletableFuture[] consumersFutures = new CompletableFuture[3];
for (int i = 0; i < 3; i++) {
Consumer consumer = new Consumer(queue);
consumers.add(consumer);
//this static method runs Runnable in separate thread
consumersFutures[i] = CompletableFuture.runAsync(consumer);
}
CompletableFuture[] producersFutures = new CompletableFuture[2];
producersFutures[0] = CompletableFuture.runAsync(p1);
producersFutures[1] = CompletableFuture.runAsync(p2);
// allOf returns new CompletableFuture that is completed only
// when the last given future completes
CompletableFuture.allOf(producersFutures).thenAccept(v -> {
System.out.println("Completed producing!");
for (Consumer consumer: consumers) {
consumer.finish = true;
}
});
// waiting for all consumers to complete
CompletableFuture.allOf(consumersFutures).get();
System.out.println("Completed consuming!");