My goal is to limit memory usage when processing a big file. To do that I'm using a thread pool implementation which is supposed to make it so that it will be impossible to load more data from the file then it is processed at a given time.
try (CSVParser parser = new CSVParser(new File("...."))) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
}
});
for (Item item; (item = parser.nextItem()) != null;) {
executor.submit(new ItemsProcessor(item));
}
executor.shutdown();
executor.awaitTermination(12, TimeUnit.HOURS);
} catch (Exception e) {
e.printStackTrace();
}
My understanding is that RejectedExecutionHandler
's rejectedExecution
method will run on the main thread, the thread on which the ThreadPoolExecutor
was created. Is that so ?
Are the rejected tasks run on the same thread that created the thread pool ?
As I understand it, this approach should only load a maximum of 12 items in memory. 10 that are being processed by the thread pool, one in the thread pool's queue and one that has been rejected and it is run on the same thread as the loop (pausing the loop).
You are right, RejectedExecutionHander is run in main thread.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestRejectedExecution
{
public static void main( String[] args )
{
Runnable r = () -> {
Thread cur = Thread.currentThread();
System.out.println( String.format( "in runnable, thread id: %s, name: %s, group name %s",
cur.getId(), cur.getName(), cur.getThreadGroup().getName() ) );
try
{
Thread.sleep( 5000 );
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
};
Thread cur = Thread.currentThread();
System.out.println( String.format( "in main, thread id: %s, name: %s, group name %s",
cur.getId(), cur.getName(), cur.getThreadGroup().getName() ) );
try {
ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MINUTES, new ArrayBlockingQueue<>( 2),
( r1, executor1 ) -> {
Thread cur1 = Thread.currentThread();
System.out.println( String.format( "in REH, thread id: %s, name: %s, group name %s",
cur1.getId(), cur1
.getName(), cur1
.getThreadGroup().getName() ) );
} );
for (int i=0; i<5; i++ ) {
executor.submit( r );
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
Here is the output:
in main, thread id: 1, name: main, group name main
in REH, thread id: 1, name: main, group name main
in REH, thread id: 1, name: main, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
Process finished with exit code 0