I'm using parallelStream to do some file uploads in parallel, some are large files some are small. I've noticed that not all workers are being used.
Everything runs fine at first, all threads being used (I set the parallelism option to 16). Then at a certain point (once it gets to larger files), it only uses one thread
simplified code:
files.parallelStream().forEach((file) -> {
try (FileInputStream fileInputStream = new FileInputStream(file)) {
IDocumentStorageAdaptor uploader = null;
try {
logger.debug("Adaptors before taking: " + uploaderPool.size());
uploader = uploaderPool.take();
logger.debug("Took an adaptor!");
logger.debug("Adaptors after taking: " + uploaderPool.size());
uploader.addNewFile(file);
} finally {
if (uploader != null) {
logger.debug("Adding one back!");
uploaderPool.put(uploader);
logger.debug("Adaptors after putting: " + uploaderPool.size());
}
}
} catch (InterruptedException | IOException e) {
throw new UploadException(e);
}
});
uploaderPool is a ArrayBlockingQueue. logs:
[ForkJoinPool.commonPool-worker-8] - Adaptors before taking: 0
[ForkJoinPool.commonPool-worker-15] - Adding one back!
[ForkJoinPool.commonPool-worker-8] - Took an adaptor!
[ForkJoinPool.commonPool-worker-15] - Adaptors after putting: 0
...
...
...
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
[ForkJoinPool.commonPool-worker-10] - Adding one back!
[ForkJoinPool.commonPool-worker-10] - Adaptors after putting: 16
[ForkJoinPool.commonPool-worker-10] - Adaptors before taking: 16
[ForkJoinPool.commonPool-worker-10] - Took an adaptor!
[ForkJoinPool.commonPool-worker-10] - Adaptors after taking: 15
It seems like all work (items in the list) is distributed among the 16 threads, and things delegated to one thread will just wait until the thread is free to work rather than using an available thread. Is there a way to change how parallelStream does its work queueing? I read forkjoinpool docs and it mentions work-stealing, but only for spawned subtasks.
My other plan was to perhaps randomize the sorting of the list I'm using parallelStream on and maybe that would balance out things.
Thanks!
The split-vs-compute heuristics for parallel streams are tuned for data-parallel operations, not for IO-parallel operations. (In other words, they are tuned to keep the CPUs busy, but not to generate way more tasks than you have CPUs.) As a result, they are biased towards computation over forking. There are currently no options for overriding these choices.