I've tried typical some example of multithreading, after that I would like to try typical producer-consumer problem.
(Producer can produce if there's space and also if consumer is not consuming and vice versa)
But I have problem with shared-resources, is in java something like semaphores in C ? (usage with wait and post functions)
I have:
class MyThread implements Runnable
- basic class for my Threadsclass Producer extends MyThread
- producer threadclass Consumer extends MyThread
- consumer classclass ThreadContainer
- shared resources (stock)In ThreadContainer
I have prepared some lock, which I found and tried, but that doesnt works as should:
java.lang.IllegalMonitorStateException
at java.lang.Object.notify(Native Method)Running Consumer 0 [1]
java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
(etc.)
I will be grateful if somebody explain me "how to".
Classes below
MyThread:
public class MyThread implements Runnable {
private Thread t;
private String threadName;
private ThreadContainer container;
MyThread(String name, ThreadContainer cont) {
threadName = name;
this.container = cont;
System.out.println("Creating " + threadName);
}
public void run() {
}
public void start() {
System.out.println("Starting " + threadName);
if (t == null) {
t = new Thread(this, threadName);
t.start();
}
}
public ThreadContainer getContainer() {
return container;
}
public String getThreadName() {
return threadName;
}
}
Producer:
public class Producer extends MyThread {
Producer(String name, ThreadContainer cont) {
super(name, cont);
}
public void produce(int amount) {
super.getContainer().produce(amount);
}
@Override
public void run() {
System.out.println("Running " + super.getThreadName());
try {
for (int i = 10; i > 0; i--) {
synchronized (super.getContainer().lock) {
System.out.println(super.getThreadName()
+ " want to produce: " + i);
while (!super.getContainer().canProduce(i)) {
super.getContainer().lock.wait();
}
System.out.println(super.getThreadName() + " producing: "
+ i);
super.getContainer().produce(i);
System.out.println("Container state: "
+ super.getContainer());
}
}
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
System.out.println("Thread " + super.getThreadName() + " exiting.");
}
}
Consumer:
public class Consumer extends MyThread {
Consumer(String name, ThreadContainer cont) {
super(name, cont);
}
public void consume(int am) {
super.getContainer().consume(am);
}
@Override
public void run() {
System.out.println("Running " + super.getThreadName());
try {
for (int i = 10; i > 0; i--) {
synchronized (super.getContainer().lock) {
System.out.println(super.getThreadName()
+ " want to consume: " + i);
while (!super.getContainer().canConsume(i)) {
super.getContainer().lock.wait();
}
System.out.println(super.getThreadName() + " consuming: "
+ i);
super.getContainer().consume(i);
System.out.println("Container state: "
+ super.getContainer());
}
}
Thread.sleep(50);
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
System.out.println("Thread " + super.getThreadName() + " exiting.");
}
}
Container:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadContainer {
private int capacity;
private int value;
private Lock locky = new ReentrantLock(true);
public ThreadContainer(int capacity) {
this.capacity = capacity;
this.value = 0;
}
public void produce(int amount){
if(this.value + amount <= this.capacity){
this.value += amount;
}else{
this.value = capacity;
}
}
public void consume(int amount){
if(this.value - amount >= 0 ){
this.value -= amount;
}else{
this.value =0;
}
}
public boolean canProduce(int am){
return (this.value + am) <= this.capacity;
}
public boolean canConsume(int am){
return (this.value - am) >= 0;
}
public boolean tryLock(){
if(this.locky.tryLock()){
this.locky.lock();
return true;
}else{
return false;
}
}
public void unlock(){
this.locky.unlock();
this.locky.notify();
}
public void waitLock() throws InterruptedException{
this.locky.wait();
}
@Override
public String toString() {
return "capacity: " + this.capacity + ", value: " + this.value;
}
}
MainClass:
public class RunFrom {
public static void main(String args[]) {
ThreadContainer container = new ThreadContainer(25);
/*
Producer prod = new Producer("Producer", container);
prod.start();
Consumer cons = new Consumer("Consumer", container);
cons.start();
*/
int prodCount =0;
int conCount =0;
for (int i = 0; i < 5; i++) {
if(i%2 == 0){
Producer prod = new Producer("Producer " + prodCount + " [" + i + "]", container);
prodCount++;
prod.start();
}else{
Consumer cons = new Consumer("Consumer " + conCount + " [" + i + "]", container);
conCount++;
cons.start();
}
}
}
}
So, I made modification as follows @fildor link in post It looks like if it works for 2 threads ok (1 consumer and 1 producer), but still there is a problem while I creates more threads..
Consumer
//...
try {
for (int i = 10; i > 0; i--) {
System.out.println(super.getThreadName() + " want to consume: "
+ i);
System.out.println(super.getThreadName() + " consuming: " + i);
super.getContainer().consume(i);
System.out.println("Container state: " + super.getContainer());
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
//...
Producer
//...
try {
for (int i = 10; i > 0; i--) {
System.out.println(super.getThreadName() + " want to produce: "
+ i);
System.out.println(super.getThreadName() + " producing: " + i);
super.getContainer().produce(i);
System.out.println("Container state: " + super.getContainer());
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println("Thread " + super.getThreadName()
+ " interrupted.");
}
//...
Stock container
//...
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
//...
public void produce(int amount) {
lock.lock();
try {
while (!canProduce(amount)) {
notFull.wait();
}
if (this.value + amount <= this.capacity) {
this.value += amount;
} else {
this.value = capacity;
}
notEmpty.signal();
} catch (InterruptedException e) {
System.out.println("InterruptedException" + e);
} finally {
lock.unlock();
}
}
public void consume(int amount) {
lock.lock();
try {
while (!canConsume(amount)) {
notEmpty.wait();
}
if (this.value - amount >= 0) {
this.value -= amount;
} else {
this.value = 0;
}
notFull.signal();
} catch (InterruptedException e) {
System.out.println("InterruptedException" + e);
} finally {
lock.unlock();
}
}
for 4 threads (2 producers and 2 consumers) output looks like:
Creating Producer 0 [0]
Starting Producer 0 [0]
Running Producer 0 [0]
Producer 0 [0] want to produce: 10
Producer 0 [0] producing: 10
Container state: capacity: 25, value: 10
Creating Consumer 0 [1]
Starting Consumer 0 [1]
Creating Producer 1 [2]
Starting Producer 1 [2]
Creating Consumer 1 [3]
Running Consumer 0 [1]
Starting Consumer 1 [3]
Creating Producer 2 [4]
Starting Producer 2 [4]
Running Producer 1 [2]
Producer 1 [2] want to produce: 10
Producer 1 [2] producing: 10
Consumer 0 [1] want to consume: 10
Consumer 0 [1] consuming: 10
Container state: capacity: 25, value: 20
Container state: capacity: 25, value: 10
Running Consumer 1 [3]
Consumer 1 [3] want to consume: 10
Running Producer 2 [4]
Producer 2 [4] want to produce: 10
Producer 2 [4] producing: 10
Container state: capacity: 25, value: 20
Consumer 1 [3] consuming: 10
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 9
Producer 0 [0] producing: 9
Container state: capacity: 25, value: 19
Consumer 0 [1] want to consume: 9
Consumer 0 [1] consuming: 9
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 9
Producer 1 [2] producing: 9
Container state: capacity: 25, value: 19
Producer 2 [4] want to produce: 9
Producer 2 [4] producing: 9
Exception in thread "Producer 2 [4]" Consumer 1 [3] want to consume: 9
Consumer 1 [3] consuming: 9
Container state: capacity: 25, value: 10
java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Unknown Source)
at test.ThreadContainer.produce(ThreadContainer.java:24)
at test.Producer.run(Producer.java:21)
at java.lang.Thread.run(Unknown Source)
Producer 0 [0] want to produce: 8
Producer 0 [0] producing: 8
Container state: capacity: 25, value: 18
Consumer 0 [1] want to consume: 8
Consumer 0 [1] consuming: 8
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 8
Producer 1 [2] producing: 8
Container state: capacity: 25, value: 18
Consumer 1 [3] want to consume: 8
Consumer 1 [3] consuming: 8
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 7
Producer 0 [0] producing: 7
Container state: capacity: 25, value: 17
Consumer 0 [1] want to consume: 7
Consumer 0 [1] consuming: 7
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 7
Producer 1 [2] producing: 7
Container state: capacity: 25, value: 17
Consumer 1 [3] want to consume: 7
Consumer 1 [3] consuming: 7
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 6
Producer 0 [0] producing: 6
Container state: capacity: 25, value: 16
Consumer 0 [1] want to consume: 6
Consumer 0 [1] consuming: 6
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 6
Producer 1 [2] producing: 6
Container state: capacity: 25, value: 16
Consumer 1 [3] want to consume: 6
Consumer 1 [3] consuming: 6
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 5
Producer 0 [0] producing: 5
Container state: capacity: 25, value: 15
Consumer 0 [1] want to consume: 5
Consumer 0 [1] consuming: 5
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 5
Producer 1 [2] producing: 5
Container state: capacity: 25, value: 15
Consumer 1 [3] want to consume: 5
Consumer 1 [3] consuming: 5
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 4
Producer 0 [0] producing: 4
Container state: capacity: 25, value: 14
Consumer 0 [1] want to consume: 4
Consumer 0 [1] consuming: 4
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 4
Producer 1 [2] producing: 4
Container state: capacity: 25, value: 14
Consumer 1 [3] want to consume: 4
Consumer 1 [3] consuming: 4
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 3
Producer 0 [0] producing: 3
Container state: capacity: 25, value: 13
Consumer 0 [1] want to consume: 3
Consumer 0 [1] consuming: 3
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 3
Producer 1 [2] producing: 3
Container state: capacity: 25, value: 13
Consumer 1 [3] want to consume: 3
Consumer 1 [3] consuming: 3
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 2
Producer 0 [0] producing: 2
Container state: capacity: 25, value: 12
Consumer 0 [1] want to consume: 2
Consumer 0 [1] consuming: 2
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 2
Producer 1 [2] producing: 2
Container state: capacity: 25, value: 12
Consumer 1 [3] want to consume: 2
Consumer 1 [3] consuming: 2
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 1
Producer 0 [0] producing: 1
Container state: capacity: 25, value: 11
Consumer 0 [1] want to consume: 1
Consumer 0 [1] consuming: 1
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 1
Producer 1 [2] producing: 1
Container state: capacity: 25, value: 11
Consumer 1 [3] want to consume: 1
Consumer 1 [3] consuming: 1
Container state: capacity: 25, value: 10
Thread Producer 0 [0] exiting.
Thread Consumer 0 [1] exiting.
Thread Producer 1 [2] exiting.
Thread Consumer 1 [3] exiting.
Isn't there problem because more consumers are waiting for same signal as more producers for another one?
You should really read a Threads in Java 101 tutorial somewhere. The Exception you are getting is because you are waiting on an object without acquiring the intrinsic lock. Given any object identified with lock
, idiomatic code is:
synchronized (lock) {
while (!condition) {
lock.wait();
}
}