My ultimate goal is to achieve batch processing of lines in an Executor Service. I have the following snippet of code:
while ((bLine = bufferedReader.readLine()) != null) {
// We will use an array to hold 100 lines, so that we can batch process in a
// single thread
batchArray.add(bLine);
switch (batchArray.size()) {
case 100:
Future<?> future = executor.submit(new LocalThreadPoolExecutor(batchArray, closeableHttpClient, httpPost));
futures.add(future);
// batchArray.clear() <--- point of failure
break;
default:
logger.info("Batch size in switch: "+batchArray.size());
}
}
Now if I do an batchArray.clear()
in the case 100
I get a concurrentModificationException. Unable to determine how can I reinit the array list and send 100 lines to my executor as they are read from file.
below is the stacktrace:
java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
at java.util.ArrayList$Itr.next(ArrayList.java:831)
at consumer.ril.com.LocalThreadPoolExecutor.run(LocalThreadPoolExecutor.java:37)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I get thie exception in my LocalThreadPoolExecutor
class when I try to read the batchArray, which is passed in the constructor of this class.
Simple solution -- you need pass to LocalThreadPoolExecutor copy of array and clean original array.
Future<?> future = executor.submit(new LocalThreadPoolExecutor(new ArrayList<>
(batchArray), closeableHttpClient, httpPost));
futures.add(future);
batchArray.clear();