Here is my code:
public class BigFileReader implements Runnable {
private final String fileName;
int a = 0;
private final BlockingQueue<String> linesRead;
public BigFileReader(String fileName, BlockingQueue<String> linesRead) {
this.fileName = fileName;
this.linesRead = linesRead;
}
@Override
public void run() {
try {
//since it is a sample, I avoid the manage of how many lines you have read
//and that stuff, but it should not be complicated to accomplish
BufferedReader br = new BufferedReader(new FileReader(new File("E:/Amazon HashFile/Hash.txt")));
String str = "";
while((str=br.readLine())!=null)
{
linesRead.put(str);
System.out.println(a);
a++;
}
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("Completed");
}
}
public class BigFileProcessor implements Runnable {
private final BlockingQueue<String> linesToProcess;
public BigFileProcessor (BlockingQueue<String> linesToProcess) {
this.linesToProcess = linesToProcess;
}
@Override
public void run() {
String line = "";
try {
while ( (line = linesToProcess.take()) != null) {
//do what you want/need to process this line...
String [] pieces = line.split("(...)/g");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class BigFileWholeProcessor {
private static final int NUMBER_OF_THREADS = 2;
public void processFile(String fileName) {
BlockingQueue<String> fileContent = new LinkedBlockingQueue<String>();
BigFileReader bigFileReader = new BigFileReader(fileName, fileContent);
BigFileProcessor bigFileProcessor = new BigFileProcessor(fileContent);
ExecutorService es = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
es.execute(bigFileReader);
es.execute(bigFileProcessor);
es.shutdown();
}
}
This code is fine, but with a major problem. That is, the thread never ends! Even the entire process is completed, I can still the program is alive. What is wrong here?
BlockingQueue.take() will block until an element is available:
Retrieves and removes the head of this queue, waiting if necessary until an element becomes available.
So after BigFileReader has finished reading the input file and placing lines on the BlockingQueue, BigFileProcessor will wait forever in the take()
method for new input.
You might want to find a way to signal to BigFileProcessor that there will be no more input ever put on the BlockingQueue, possibly by adding a sentinel value to the queue or finding some other way to tell BigFileProcessor to stop calling take()
.
An example of the sentinel approach:
public class BigFileReader implements Runnable {
public static final String SENTINEL = "SENTINEL"; //the actual value isn't very important
...
while((str=br.readLine())!=null) {
linesRead.put(str);
}
//when reading the file is done, add SENTINEL to the queue
linesRead.put(SENTINEL);
}
//inside BigFileProcessor...
while ( (line = linesToProcess.take()) != null) {
// check if value in queue is sentinel value
if (line == BigFileReader.SENTINEL) {
//break out of the while loop
break;
}
//otherwise process the line as normal
}
Another approach could be to use the overload of poll
that takes a timeout value instead of take()
, and have the logic be that BigFileProcessor breaks it's loop if it can't read anything from the queue for more than N seconds, etc.