i have to finish an exercise where i have to go find ".java" files in my folder path using the producer/consumer pattern with at least one producer thread and x consumer threads.
ProducerConsumer-class: First i tried to stop the consumer when the producer is finished finding files with setting a while loop from true to false which doesn't work. It doesn't work because the threads are still running obviously just not doing anything useful. Now i use a closePool() function (as well).
So the function does work if i dont put up with my locks called locka. And thats basically something i don't understand.
So if i have
loka.lock();
ende = false;
loka.unlock();
and
while(ende){
loka.lock();
System.out.println(xy.getQueue());
loka.unlock();
}
the closePool() function will never get called. And this is something i don't understand. If i put away the locks in the while loop it does work and the threads do stop.
questions:
1) The ende parameter will be set false anyway so the lock will be finally released.
2) Secondly i did only lock a part of the method and not the object?! As far as i understand it other code in other methods in the same object will still work at the same time. Or is the lock like synchronized and i synchronize the whole object while it is in the lock state? In my understanding the while loop in the consumer-thread is locked but the producer-thread will still call closePool();
on a extra note: maybe i didn't even design my Producer/Consumer pattern the right way.
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class FindJavaVisitorp extends SimpleFileVisitor<Path> {
private BlockingQueue<String> xxx = new ArrayBlockingQueue<String>(10);
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().endsWith(".java")) {
try {
xxx.put(file.toString());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return FileVisitResult.CONTINUE;
}
public String getQueue() throws InterruptedException {
return xxx.take();
}
}
public class ProducerConsumer {
private volatile boolean ende = true;
private Path path;
private FindJavaVisitorp xy;
private Lock loka = new ReentrantLock();
private ExecutorService pepe;
public ProducerConsumer(Path path, FindJavaVisitorp xy, ExecutorService xyz) {
this.path = path;
this.xy = xy;
pepe = xyz;
}
public void produce() throws IOException, InterruptedException {
Files.walkFileTree(path, xy);
loka.lock();
ende = false;
loka.unlock();
closePool();
}
public void consume() throws InterruptedException {
while (ende) {
loka.lock();
System.out.println(xy.getQueue());
loka.unlock();
}
}
public void closePool() {
pepe.shutdown();
try {
if (!pepe.awaitTermination(60, TimeUnit.SECONDS)) {
pepe.shutdownNow();
if (!pepe.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Pool couldn't be terminated!");
}
}
} catch (InterruptedException e) {
pepe.shutdownNow();
}
}
}
public class Test {
public static void main(String[] args) {
Path startingDir = Paths.get("/usr/local/");
FindJavaVisitorp x = new FindJavaVisitorp();
ExecutorService exec = Executors.newCachedThreadPool();
final ProducerConsumer pp = new ProducerConsumer(startingDir, x, exec);
exec.submit(new Runnable() {
public void run() {
try {
pp.produce();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
// x.printQueue();
for (int j = 0; j < 5; j++) {
exec.submit(new Runnable() {
public void run() {
try {
pp.consume();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
exec.shutdown();
}
}
BlockingQueue
and if queue is empty then then will not get a chance to check the flag variable. Also you don't need locks as you're already using BlockingQueue..
Following code will try to guard loka=false
from any concurrent access.
loka.lock();
ende = false;//critical section
loka.unlock();
Following code will be free from concurrent access and mutually exclusive from the above critical section.
while(ende){
loka.lock();
System.out.println(xy.getQueue());//critical section
loka.unlock();
}
As there's nothing common between these two critical sections, mutual exclusion is doing nothing. Since ende
is volatile guarding it with locks doesn't do anything as primitive types already have atomic access.
Reads and writes are atomic for reference variables and for most primitive variables (all types except long and double).
Reads and writes are atomic for all variables declared volatile (including long and double variables).
lock() and
unlock()` will be locked from concurrent access. Object itself is free to do any concurrent simultaneous (to the locked block) task outside these blocks.And finally follow proper naming conventions and give your variables meaningful names.
Main answer to your problem why your threads are still running is because they're waiting on the blockingQueue.takeItem()
and they can not be released from it unless queue is filled again, however since Producer is finished there's no possibility of that happening.
How to avoid this behavior
There are no methods on BlockingQueue which allow immediate release of waiting threads One thing we can do is make producer put a LAST_ITEM and have consumers check if the item they got is LAST_ITEM and thus they can release themselves.
Following is working code. I have made some modifications to the variable and method names to make them more meaningful.
JavaFileVisitor
package filevisitor;
import java.nio.file.FileVisitResult;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class JavaFileVisitor extends SimpleFileVisitor<Path> {
private BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
public static String NO_MORE_ITEMS = "### NO MORE ITEMS ###";
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toString().endsWith(".java")) {
try {
blockingQueue.put(file.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return FileVisitResult.CONTINUE;
}
public String getQueueItem() throws InterruptedException {
String item = blockingQueue.take();
if(NO_MORE_ITEMS.equals(item)) {
setNoMoreItems();
}
return item;
}
public void setNoMoreItems() {
try {
blockingQueue.put(NO_MORE_ITEMS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ProducerConsumer
package filevisitor;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class ProducerConsumer {
private Path path;
private JavaFileVisitor fileVisitor;
public ProducerConsumer(Path path, JavaFileVisitor visitor) {
this.path = path;
this.fileVisitor = visitor;
}
public void produce() throws IOException, InterruptedException {
Files.walkFileTree(path, fileVisitor);
fileVisitor.setNoMoreItems();
}
public void consume() throws InterruptedException {
while (true) {
String item = fileVisitor.getQueueItem();
if(JavaFileVisitor.NO_MORE_ITEMS.equals(item)) {
break;
}
System.out.println(item);
}
}
}
ProducerConsumerMain
package filevisitor;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerMain {
public static void main(String[] args) {
Path startingDir = Paths.get("src/filevisitor");
JavaFileVisitor fileVisitor = new JavaFileVisitor();
ExecutorService executor = Executors.newCachedThreadPool();
final ProducerConsumer producerConsumer = new ProducerConsumer(startingDir, fileVisitor);
executor.submit(new Runnable() {
public void run() {
System.out.println("Producer started");
try {
producerConsumer.produce();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Producer finished");
}
});
for (int j = 0; j < 5; j++) {
executor.submit(new Runnable() {
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " Consumer Started");
try {
producerConsumer.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(threadName + " Consumer finished");
}
});
}
executor.shutdown();
System.out.println("Executor shutdown, waiting for threads to finish");
try {
executor.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Exiting main");
}
}
Output
Producer started
pool-1-thread-3 Consumer Started
pool-1-thread-2 Consumer Started
Executor shutdown, waiting for threads to finish
pool-1-thread-5 Consumer Started
pool-1-thread-6 Consumer Started
pool-1-thread-4 Consumer Started
src\filevisitor\JavaFileVisitor.java
src\filevisitor\ProducerConsumerMain.java
src\filevisitor\ProducerConsumer.java
pool-1-thread-6 Consumer finished
pool-1-thread-4 Consumer finished
pool-1-thread-3 Consumer finished
pool-1-thread-5 Consumer finished
Producer finished
pool-1-thread-2 Consumer finished
Exiting main