In a recent deleted post I have asked the following questions:
I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which read lines from a file and put them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue and store the results in a new file.
I am hoping if you can give me some feedback on what I should consider to achieve high performance. I've spent weeks on reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feed-back. Please find below the points I need information about.
I hope I didn't say anything wrong.
You have advised to implement something before asking questions, so I deleted the post and tried to implement the model. Here is my code.
The Producer where I have one thread read from a file and put them in a BlockingQueue.
class Producer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
private float numline=0;
protected transient BufferedReader bufferedReader;
protected transient BufferedWriter bufferedWriter;
public Producer (String location, BlockingQueue<String> blockingQueue) {
this.location=location;
this.blockingQueue=blockingQueue;
try {
bufferedReader = new BufferedReader(new FileReader(location));
// Create the file where the processed lines will be stored
createCluster();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
}
}
@Override
public void run() {
String line=null;
try {
while ((line = bufferedReader.readLine()) != null) {
// Count the read lines
numline++;
blockingQueue.put(line);
}
} catch (IOException e) {
System.out.println("Problem reading the log file!");
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void createCluster () {
try {
String clusterName=location+".csv";
bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
bufferedWriter.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}
The Consumer where multiple threads will the from the BlockingQueue and do some processing 'f()' and store the results in a new file.
class Consumer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
protected transient BufferedWriter bufferedWriter;
private String clusterName;
public Consumer (String location, BlockingQueue<String> blockingQueue) {
this.blockingQueue=blockingQueue;
this.location=location;
clusterName=location+".csv";
}
@Override
public void run() {
while (true) {
try {
//Retrieve the lines
String line = blockingQueue.take();
String result = doNormalize (line);
// TO DO
//
//bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
//BufferedWriter.write(result+ "\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//Pattern pattern, Matcher matcher
private String doNormalize(String line){
String rules [] = getRules(); // return an array of Regex
String tmp="";
for (String rule : rules) {
Pattern pattern = Pattern.compile(rule);
Matcher matcher = pattern.matcher(line);
if (matcher.find()){
Set<String> namedGroups = getNamedGroupCandidates(rule);
Iterator<String> itr = namedGroups.iterator();
while(itr.hasNext()){
String value=itr.next();
tmp=tmp+matcher.group(value)+", ";
}
tmp = tmp + "\t";
break;
}
}
return tmp;
}
private Set<String> getNamedGroupCandidates(String regex) {
Set<String> namedGroups = new TreeSet<String>();
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
while (m.find()) {
namedGroups.add(m.group(1));
}
return namedGroups;
}
}
and the code in my main class. that uses 1 Producer and 3 Consumers
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
Producer readingThread = new Producer(location, queue);
new Thread(readingThread).start();
Consumer normalizers = new Consumer(location,queue);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
executor.submit(normalizers);
}
System.out.println("Stopped");
executor.shutdown();
I know my code is incomplete since I need to close and flush the reader and writes, etc. But can you tell me the mistakes I made so far in implementing the Producer/Consumer Model ? And Also on the method f(), it is a method that process a line and produces a result, I don't think I should synchronize it because I want all the consumers to use at the same time.
EDIT
Finally, this post really confused me, it suggests that if Consumers store the results the on file, it will slow down the process. This might a problem because I want performance and speed.
Bests,
For my second problem: "The SingleConsumer to "know" that the multiple consumers have done consuming/processing all the lines.". I was inspired from this post combining this comment: Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.
So, for the Consumers; here is a what I wrote in the run()
method:
@Override
public void run() {
// A Consumer keeps taking elements from the queue 1, as long as the Producer is
// producing and as long as queue 1 is not empty.
while (true) {
try {
//Retrieve the lines
String line = firstBlockingQueue.take();
If a special terminating value is found.
if (line==POISON_PILL) {
// The consumer notifies other consumers and the SignleConsumer that operates on queue 2
// and then terminates.
firstBlockingQueue.put(POISON_PILL);
secondBlockingQueue.put(SINGLE_POISIN_PILL);
return;
}
// Put the normalized events on the new Queue
String result = doNormalize (line);
if (result!=null) {
secondBlockingQueue.put(result);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
As for the SinglerConsumer, it should count the "I finished processing" message sent by the Consumers or I am using it as a SINGLE_POISON_PILL
. and terminates when that counter reaches the number of consumers in queue 1.
while (true) {
try {
//Retrieve the lines
String line = secondBlockingQueue.take();
if (line==SINGLE_POISIN_PILL) {
setCounter(getCounter()+1);
if (getCounter()== threadNumber) {
System.out.println("All "+getCounter()+" threads have finished. \n Stopping..");
return;
}
}
try {
if (line != SINGLE_POISIN_PILL) {
System.out.println(line);
bufferedWriter.write(line+"\n");
}
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
For my second problem, apparently all I had to do is add:
if (line==SINGLE_POISIN_PILL) {
setCounter(getCounter()+1);
if (getCounter()== threadNumber) {
System.out.println("All "+getCounter()+" threads have finished. \n Stopping..");
try {
if (bufferedWriter != null)
{
bufferedWriter.flush();
bufferedWriter.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return;
}
}
Once I flushed and closed the buffer, the buffer starting writing.
Hoping for your feedback.