I would like to read a file line by line, do something slow with each line that can easily be done in parallel, and write the result to a file line by line. I don't care about the order of the output. The input and output are so big they don't fit in memory. I would like to be able to set a hard limit on the number of threads running at the same time as well as the number of lines in memory.
The libary I use for file IO (Apache Commons CSV) does not seem to offer synchronised file access so I don't think I can read from the same file or write to the same file from several threads at once. If that was possible I would create a ThreadPoolExecutor and feed it a task for each line, which would simply read the line, perform the calculation and write the result.
Instead, what I think I need is a single thread that does the parsing, a bounded queue for the parsed input lines, a thread pool with jobs that do the calculations, a bounded queue for the calculated output lines, and a single thread that does the writing. A producer, a lot of consumer-producers and a consumer if that makes sense.
What I have looks like this:
BlockingQueue<CSVRecord> inputQueue = new ArrayBlockingQueue<CSVRecord>(INPUT_QUEUE_SIZE);
BlockingQueue<String[]> outputQueue = new ArrayBlockingQueue<String[]>(OUTPUT_QUEUE_SIZE);
Thread parserThread = new Thread(() -> {
while (inputFileIterator.hasNext()) {
CSVRecord record = inputFileIterator.next();
parsedQueue.put(record); // blocks if queue is full
}
});
// the job queue of the thread pool has to be bounded too, otherwise all
// the objects in the input queue will be given to jobs immediately and
// I'll run out of heap space
// source: https://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
Thread processingBossThread = new Thread(() -> {
while (!inputQueue.isEmpty() || parserThread.isAlive()) {
CSVRecord record = inputQueue.take(); // blocks if queue is empty
executorService.execute(() -> {
String[] array = this.doStuff(record);
outputQueue.put(array); // blocks if queue is full
});
}
// getting here that means that all CSV rows have been read and
// added to the processing queue
executorService.shutdown(); // do not accept any new tasks
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
// wait for existing tasks to finish
});
Thread writerThread = new Thread(() -> {
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
String[] outputRow = outputQueue.take(); // blocks if queue is empty
outputFileWriter.printRecord((Object[]) outputRow);
});
parserThread.start();
consumerBossThread.start();
writerThread.start();
// wait until writer thread has finished
writerThread.join();
I've left out the logging and exception handling so this looks a lot shorter than it is.
This solution works but I'm not happy with it. It seems hacky to have to create my own threads, check their isAlive(), create a Runnable within a Runnable, be forced to specify a timeout when I really just want to wait until all the workers have finished, etc. All in all it's a 100+ line method, or even several hundred lines of code if I make the Runnables their own classes, for what seems like a very basic pattern.
Is there a better solution? I'd like to make use of Java's libraries as much as possible, to help keep my code maintainable and in line with best practices. I would still like to know what it's doing under the hood, but I doubt that implementing all this myself is the best way to do it.
Update: Better solution, after suggestions from the answers:
BlockingQueue<Runnable> jobQueue = new ArrayBlockingQueue<Runnable>(JOB_QUEUE_SIZE);
RejectedExecutionHandler rejectedExecutionHandler
= new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService executorService
= new ThreadPoolExecutor(
NUMBER_OF_THREADS,
NUMBER_OF_THREADS,
0L,
TimeUnit.MILLISECONDS,
jobQueue,
rejectedExecutionHandler
);
while (it.hasNext()) {
CSVRecord record = it.next();
executorService.execute(() -> {
String[] array = this.doStuff(record);
synchronized (writer) {
writer.printRecord((Object[]) array);
}
});
}
I would like to point out something first, I could think of three possible scenarios:
1.- For all the lines of a file, the time that it needs to process a line, by using the doStuff method, is bigger than the time that it takes to read the same line from disk and parse it
2.- For all the lines of a file, the time that it needs to process a line, by using the doStuff method, is lower or equal than the time that it takes to read the same line and parse it.
3.- Neither the first nor the second scenarios for the same file.
Your solution should be good for the first scenario, but not for the second or third ones, also, you're not modifying queues in a synchronized way. Even more, if you're experiencing scenarios like number 2, then you're wasting cpu cycles when there is no data to be sent to the output, or when there are no lines to be sent to the queue to be processed by the doStuff, by spining at:
while (!outputQueue.isEmpty() || consumerBossThread.isAlive()) {
Finally, regardless of which scenario you're experiencing, I would suggest you to use Monitor objects, that will allow you to put specific threads to wait until another process notifies them that a certain condition is true and that they can be activated again. By using Monitor objects you'll not waste cpu cycles.
For more information, see: https://docs.oracle.com/javase/7/docs/api/javax/management/monitor/Monitor.html
EDIT: I've deleted the suggestion of using Synchronized Methods, since as you've pointed out, BlockingQueue's methods are thread-safe (or almost all) and prevents race conditions.