I was implementing Producer Consumer using Condition Variables and ran into a issue where the consumer blocks, basically the consumer thread doesn't pick up the last batch of product produced by the producer and thus never ends.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerImpl {
protected List<Integer> buffer = new ArrayList<Integer>();
//the lock on which condition variables are taken
protected volatile Lock lock = new ReentrantLock(true);
// the consumer will signal using this condition variable to the producer to start it's production
protected volatile Condition producerStartProducing = lock.newCondition();
// the producer will use this condition variable to start it's production
protected volatile Condition consumerStartConsuming = lock.newCondition();
class Consumer implements Callable<String> {
int num;
public Consumer(int i) {
this.num = i;
}
/**
* In a loop, take a lock each time, check if there is an item to consume from shared buffer
* If yes, then consume it and loop back
* If no, then wait for the producer to signal you.
* Signals the producer each time it consumes an item.
*/
@Override
public String call() throws Exception {
try {
int i = 0;
while (i < 10) {
while (buffer.isEmpty()) {
consumerStartConsuming.await();
}
lock.lock();
System.out.println("Consumer - " + i);
buffer.remove(buffer.size() - 1);
producerStartProducing.signalAll();
i++;
}
} finally {
lock.unlock();
}
System.out.println("Consumed All");
return "Consumed All";
}
}
class Producer implements Callable<String> {
int num;
public Producer(int i) {
this.num = i;
}
/**
* In a loop, take a lock, produce items in a batch of 3, and then wait till the
* consumer signals you to produce more.
* Signals the consumer each time it produces an item
*/
@Override
public String call() throws Exception {
try {
int i = 0;
while (i < 10) {
lock.lock();
while (buffer.size() > 2) {
producerStartProducing.await();
}
System.out.println("Producer - " + i);
buffer.add(1);
consumerStartConsuming.signalAll();
i++;
}
} finally {
lock.unlock();
}
System.out.println("Produced All");
return "Produced All";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
try {
ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
final String string1 = p1.get();
final String string2 = c1.get();
System.out.println(string1 + " --- " + string2);
} finally {
newFixedThreadPool.shutdown();
}
}
}
Output:
Producer - 0
Producer - 1
Producer - 2
Consumer - 0
Consumer - 1
Consumer - 2
Producer - 3
Producer - 4
Producer - 5
Consumer - 3
Consumer - 4
Consumer - 5
Producer - 6
Producer - 7
Producer - 8
Consumer - 6
Consumer - 7
Consumer - 8
Producer - 9
Produced All
The producer code produces three products in a batch and then waits for the consumer to pick them up using a condition variable signal.
Similarly the consumer threads picks up the items and signals the producer to produce more each time it takes an item from the buffer.
This is just arbitrary implementation but the problem is existent for any other combination of such actions in this code, would appreciate if someone can point what's going wrong with this piece of code and point out what's going on wrong here.
locking scope was incorrect so I have fix it below code.
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerImpl {
protected List<Integer> buffer = new ArrayList<Integer>();
// the lock on which condition variables are taken
protected volatile Lock lock = new ReentrantLock(true);
// the consumer will signal using this condition variable to the producer to
// start it's production
protected volatile Condition producerStartProducing = lock.newCondition();
// the producer will use this condition variable to start it's production
protected volatile Condition consumerStartConsuming = lock.newCondition();
class Consumer implements Callable<String> {
int num;
public Consumer(int i) {
this.num = i;
}
/**
* In a loop, take a lock each time, check if there is an item to
* consume from shared buffer If yes, then consume it and loop back If
* no, then wait for the producer to signal you. Signals the producer
* each time it consumes an item.
*/
@Override
public String call() throws Exception {
int i = 0;
while (i < 10) {
try {
lock.lock();
while (buffer.isEmpty()) {
consumerStartConsuming.await();
}
System.out.println("Consumer - " + i);
buffer.remove(buffer.size() - 1);
producerStartProducing.signalAll();
i++;
} finally {
lock.unlock();
}
}
System.out.println("Consumed All");
return "Consumed All";
}
}
class Producer implements Callable<String> {
int num;
public Producer(int i) {
this.num = i;
}
/**
* In a loop, take a lock, produce items in a batch of 3, and then wait
* till the consumer signals you to produce more. Signals the consumer
* each time it produces an item
*/
@Override
public String call() throws Exception {
int i = 0;
while (i < 10) {
try {
lock.lock();
while (buffer.size() > 2) {
producerStartProducing.await();
}
System.out.println("Producer - " + i);
buffer.add(1);
consumerStartConsuming.signalAll();
i++;
} finally {
lock.unlock();
}
}
System.out.println("Produced All");
return "Produced All";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
try {
ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
final String string1 = p1.get();
final String string2 = c1.get();
System.out.println(string1 + " --- " + string2);
} finally {
newFixedThreadPool.shutdown();
}
}
}