I am having 1 producer and 2 (multiple) consumers.
The following code works fine(free from race conditions IMO) but the problem is whenever the buffer is empty,each consumer is executing an infinite loop, which is just wait of resources.
How can i optimise it? I am thinking of using something like notifying consumer in case anything is added to buffer but having problem in implementation.
public class Test {
public static void main(String[] args) {
Assembly assembly = new Assembly(new ArrayList<>(), new ReentrantLock(true));
new Thread(new Runnable() {
@Override
public void run() {
assembly.consume();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
assembly.produce();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
assembly.consume();
}
}).start();
}
}
class Assembly {
List<Integer> buffer;
Lock bufferLock;
public Assembly(List<Integer> buffer, Lock bufferLock) {
this.buffer = buffer;
this.bufferLock = bufferLock;
}
public void produce() {
Integer[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 99};
Random random = new Random();
for (Integer num : nums) {
try {
bufferLock.lock();
buffer.add(num);
if (num != 99) {
System.out.println("Added: " + num);
}
} finally {
bufferLock.unlock();
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
}
}
public void consume() {
while (true) {
try {
bufferLock.lock();
if (buffer.isEmpty()) {
**IS SOME OPTIMISATION POSSIBLE HERE?**
continue;
}
if (buffer.get(0).equals(99)) {
break;
}
System.out.println(
"Removed: " + buffer.remove(0) + " by " + Thread.currentThread().getName());
} finally {
bufferLock.unlock();
}
}
}
}
Use a Semaphore
to add/remove available elements
public class ProducerConsumer {
List<Integer> buffer = new ArrayList<>();
Semaphore available = new Semaphore(0);
public static void main(String... args) {
ProducerConsumer pc = new ProducerConsumer();
new Thread(pc::consume).start();
new Thread(pc::produce).start();
new Thread(pc::consume).start();
}
public void produce() {
Integer[] nums = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 99};
Random random = new Random();
for (Integer num : nums) {
try {
synchronized (buffer) {
buffer.add(num);
}
available.release();
System.out.println("Added: " + num);
} finally {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
}
}
}
}
public void consume() {
while (true) {
try {
available.acquire();
synchronized (buffer) {
if (buffer.get(0).equals(99)) {
break;
}
System.out.println(
"Removed: " + buffer.remove(0) + " by " + Thread.currentThread().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
your (consumer) threads will be blocked until a new item is available.
With output
Added: 1
Removed: 1 by Thread-0
Added: 2
Removed: 2 by Thread-2
Added: 3
Removed: 3 by Thread-0
...