Search code examples
javamultithreadingmutexthread-synchronization

Consumer, producer- mutex, sync- critical section


I've tried typical some example of multithreading, after that I would like to try typical producer-consumer problem.

(Producer can produce if there's space and also if consumer is not consuming and vice versa)

But I have problem with shared-resources, is in java something like semaphores in C ? (usage with wait and post functions)

  • I have found synchronized example
  • but I would like to try something which can I control manually (like semaphores in C)

I have:

  • class MyThread implements Runnable- basic class for my Threads
  • class Producer extends MyThread- producer thread
  • class Consumer extends MyThread- consumer class
  • class ThreadContainer- shared resources (stock)

In ThreadContainer I have prepared some lock, which I found and tried, but that doesnt works as should:

java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)Running Consumer 0 [1]
java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
(etc.)

I will be grateful if somebody explain me "how to".

Classes below

MyThread:

public class MyThread implements Runnable {
    private Thread t;
    private String threadName;

    private ThreadContainer container;

    MyThread(String name, ThreadContainer cont) {
        threadName = name;
        this.container = cont;
        System.out.println("Creating " + threadName);
    }

    public void run() {

    }

    public void start() {
        System.out.println("Starting " + threadName);
        if (t == null) {
            t = new Thread(this, threadName);
            t.start();
        }
    }

    public ThreadContainer getContainer() {
        return container;
    }

    public String getThreadName() {
        return threadName;
    }
}

Producer:

 public class Producer extends MyThread {

    Producer(String name, ThreadContainer cont) {
        super(name, cont);
    }

    public void produce(int amount) {
        super.getContainer().produce(amount);
    }

    @Override
    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {

                synchronized (super.getContainer().lock) {
                    System.out.println(super.getThreadName()
                            + " want to produce: " + i);
                    while (!super.getContainer().canProduce(i)) {
                        super.getContainer().lock.wait();
                    }
                    System.out.println(super.getThreadName() + " producing: "
                            + i);
                    super.getContainer().produce(i);
                    System.out.println("Container state: "
                            + super.getContainer());
                }

            }

            Thread.sleep(50);
        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }

        System.out.println("Thread " + super.getThreadName() + " exiting.");
    }

}

Consumer:

 public class Consumer extends MyThread {

    Consumer(String name, ThreadContainer cont) {
        super(name, cont);
    }

    public void consume(int am) {
        super.getContainer().consume(am);
    }

    @Override
    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {
                synchronized (super.getContainer().lock) {
                    System.out.println(super.getThreadName()
                            + " want to consume: " + i);
                    while (!super.getContainer().canConsume(i)) {
                        super.getContainer().lock.wait();
                    }
                    System.out.println(super.getThreadName() + " consuming: "
                            + i);
                    super.getContainer().consume(i);
                    System.out.println("Container state: "
                            + super.getContainer());
                }
            }

            Thread.sleep(50);
        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }

        System.out.println("Thread " + super.getThreadName() + " exiting.");
    }

}

Container:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadContainer {
    private int capacity;
    private int value;

    private Lock locky = new ReentrantLock(true);

    public ThreadContainer(int capacity) {
        this.capacity = capacity;
        this.value = 0; 
    }

    public void produce(int amount){
        if(this.value + amount <= this.capacity){
            this.value += amount;
        }else{
            this.value = capacity;
        }
    }

    public void consume(int amount){
        if(this.value - amount >= 0 ){
            this.value -= amount;
        }else{
            this.value =0;
        } 
    }

    public boolean canProduce(int am){
        return (this.value + am) <= this.capacity;
    }

    public boolean canConsume(int am){
        return (this.value - am) >= 0;
    }

    public boolean tryLock(){
        if(this.locky.tryLock()){
            this.locky.lock();
            return true;
        }else{
            return false;
        }
    }

    public void unlock(){
        this.locky.unlock();
        this.locky.notify();
    }

    public void waitLock() throws InterruptedException{
        this.locky.wait();
    }


    @Override
    public String toString() {
        return "capacity: " + this.capacity + ", value: " + this.value;
    }

}

MainClass:

public class RunFrom {
    public static void main(String args[]) {
        ThreadContainer container = new ThreadContainer(25);

        /*
        Producer prod = new Producer("Producer", container);
        prod.start();

        Consumer cons = new Consumer("Consumer", container);
        cons.start();
        */

        int prodCount =0;
        int conCount =0;
        for (int i = 0; i < 5; i++) {
            if(i%2 == 0){
                Producer prod = new Producer("Producer " + prodCount + " [" + i + "]", container);
                prodCount++;
                prod.start();
            }else{
                Consumer cons = new Consumer("Consumer " + conCount + " [" + i + "]", container);
                conCount++;
                cons.start();
            }
        }
    }
}

So, I made modification as follows @fildor link in post It looks like if it works for 2 threads ok (1 consumer and 1 producer), but still there is a problem while I creates more threads..

  • MyThread same as original
  • consumer just consume
  • producer just produce
  • locking is solved in stock container

Consumer

//...
    try {
                for (int i = 10; i > 0; i--) {
                    System.out.println(super.getThreadName() + " want to consume: "
                            + i);
                    System.out.println(super.getThreadName() + " consuming: " + i);
                    super.getContainer().consume(i);
                    System.out.println("Container state: " + super.getContainer());
                    Thread.sleep(100);
                }

            } catch (InterruptedException e) {
                System.out.println("Thread " + super.getThreadName()
                        + " interrupted.");
            }
//...

Producer

//...
try {
            for (int i = 10; i > 0; i--) {
                System.out.println(super.getThreadName() + " want to produce: "
                        + i);
                System.out.println(super.getThreadName() + " producing: " + i);
                super.getContainer().produce(i);
                System.out.println("Container state: " + super.getContainer());
                Thread.sleep(100);
            }

        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }
//...

Stock container

//...
final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
//...

public void produce(int amount) {
        lock.lock();
        try {
            while (!canProduce(amount)) {
                notFull.wait();
            }

            if (this.value + amount <= this.capacity) {
                this.value += amount;
            } else {
                this.value = capacity;
            }
            notEmpty.signal();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {
            lock.unlock();
        }
    }

    public void consume(int amount) {
        lock.lock();
        try {
            while (!canConsume(amount)) {
                notEmpty.wait();
            }

            if (this.value - amount >= 0) {
                this.value -= amount;
            } else {
                this.value = 0;
            }
            notFull.signal();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {
            lock.unlock();
        }
    }

for 4 threads (2 producers and 2 consumers) output looks like:

Creating Producer 0 [0]
Starting Producer 0 [0]
Running Producer 0 [0]
Producer 0 [0] want to produce: 10
Producer 0 [0] producing: 10
Container state: capacity: 25, value: 10
Creating Consumer 0 [1]
Starting Consumer 0 [1]
Creating Producer 1 [2]
Starting Producer 1 [2]
Creating Consumer 1 [3]
Running Consumer 0 [1]
Starting Consumer 1 [3]
Creating Producer 2 [4]
Starting Producer 2 [4]
Running Producer 1 [2]
Producer 1 [2] want to produce: 10
Producer 1 [2] producing: 10
Consumer 0 [1] want to consume: 10
Consumer 0 [1] consuming: 10
Container state: capacity: 25, value: 20
Container state: capacity: 25, value: 10
Running Consumer 1 [3]
Consumer 1 [3] want to consume: 10
Running Producer 2 [4]
Producer 2 [4] want to produce: 10
Producer 2 [4] producing: 10
Container state: capacity: 25, value: 20
Consumer 1 [3] consuming: 10
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 9
Producer 0 [0] producing: 9
Container state: capacity: 25, value: 19
Consumer 0 [1] want to consume: 9
Consumer 0 [1] consuming: 9
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 9
Producer 1 [2] producing: 9
Container state: capacity: 25, value: 19
Producer 2 [4] want to produce: 9
Producer 2 [4] producing: 9
Exception in thread "Producer 2 [4]" Consumer 1 [3] want to consume: 9
Consumer 1 [3] consuming: 9
Container state: capacity: 25, value: 10
java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
    at test.ThreadContainer.produce(ThreadContainer.java:24)
    at test.Producer.run(Producer.java:21)
    at java.lang.Thread.run(Unknown Source)
Producer 0 [0] want to produce: 8
Producer 0 [0] producing: 8
Container state: capacity: 25, value: 18
Consumer 0 [1] want to consume: 8
Consumer 0 [1] consuming: 8
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 8
Producer 1 [2] producing: 8
Container state: capacity: 25, value: 18
Consumer 1 [3] want to consume: 8
Consumer 1 [3] consuming: 8
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 7
Producer 0 [0] producing: 7
Container state: capacity: 25, value: 17
Consumer 0 [1] want to consume: 7
Consumer 0 [1] consuming: 7
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 7
Producer 1 [2] producing: 7
Container state: capacity: 25, value: 17
Consumer 1 [3] want to consume: 7
Consumer 1 [3] consuming: 7
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 6
Producer 0 [0] producing: 6
Container state: capacity: 25, value: 16
Consumer 0 [1] want to consume: 6
Consumer 0 [1] consuming: 6
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 6
Producer 1 [2] producing: 6
Container state: capacity: 25, value: 16
Consumer 1 [3] want to consume: 6
Consumer 1 [3] consuming: 6
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 5
Producer 0 [0] producing: 5
Container state: capacity: 25, value: 15
Consumer 0 [1] want to consume: 5
Consumer 0 [1] consuming: 5
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 5
Producer 1 [2] producing: 5
Container state: capacity: 25, value: 15
Consumer 1 [3] want to consume: 5
Consumer 1 [3] consuming: 5
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 4
Producer 0 [0] producing: 4
Container state: capacity: 25, value: 14
Consumer 0 [1] want to consume: 4
Consumer 0 [1] consuming: 4
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 4
Producer 1 [2] producing: 4
Container state: capacity: 25, value: 14
Consumer 1 [3] want to consume: 4
Consumer 1 [3] consuming: 4
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 3
Producer 0 [0] producing: 3
Container state: capacity: 25, value: 13
Consumer 0 [1] want to consume: 3
Consumer 0 [1] consuming: 3
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 3
Producer 1 [2] producing: 3
Container state: capacity: 25, value: 13
Consumer 1 [3] want to consume: 3
Consumer 1 [3] consuming: 3
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 2
Producer 0 [0] producing: 2
Container state: capacity: 25, value: 12
Consumer 0 [1] want to consume: 2
Consumer 0 [1] consuming: 2
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 2
Producer 1 [2] producing: 2
Container state: capacity: 25, value: 12
Consumer 1 [3] want to consume: 2
Consumer 1 [3] consuming: 2
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 1
Producer 0 [0] producing: 1
Container state: capacity: 25, value: 11
Consumer 0 [1] want to consume: 1
Consumer 0 [1] consuming: 1
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 1
Producer 1 [2] producing: 1
Container state: capacity: 25, value: 11
Consumer 1 [3] want to consume: 1
Consumer 1 [3] consuming: 1
Container state: capacity: 25, value: 10
Thread Producer 0 [0] exiting.
Thread Consumer 0 [1] exiting.
Thread Producer 1 [2] exiting.
Thread Consumer 1 [3] exiting.

Isn't there problem because more consumers are waiting for same signal as more producers for another one?


Solution

  • You should really read a Threads in Java 101 tutorial somewhere. The Exception you are getting is because you are waiting on an object without acquiring the intrinsic lock. Given any object identified with lock, idiomatic code is:

    synchronized (lock) {
      while (!condition) {
        lock.wait();
      }
    }