I've noticed that the spliterator produced by using Guava's Iterables.partition(collection, partitionSize).spliterator()
behaves strange.
Executing trySplit() on the resultant spliterator doesn't split, but executing trySplit() on the result of the initial trySplit() finally does.
Furthermore, using StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator(), true)
does not parallelize the the stream, but
StreamSupport.stream(Iterables.partition(collection, partitionSize).spliterator().trySplit(), true)
does parallelize and the resultant stream contains all of the partitions.
My goal is: given a collection with size 100k I want to partition it into batches of size 5000 and process those batches in parallel.
2 questions: does the spliterator generated by Iterables.partition behave correctly? Is my approach a good way to achieve my goal?
The problem here is that Spliterator
comes from an Iterable
, that does not have a known size. So the implementation internally will buffer the elements into a buffer of size 1024
and continue to increase the buffer on next iterations. What I mean by that is :
List<Integer> coll = IntStream.range(0, 150_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 1);
Spliterator<List<Integer>> sp = it.spliterator();
Spliterator<List<Integer>> one = sp.trySplit();
System.out.println(one.getExactSizeIfKnown());
Spliterator<List<Integer>> two = sp.trySplit();
System.out.println(two.getExactSizeIfKnown());
Spliterator<List<Integer>> three = sp.trySplit();
System.out.println(three.getExactSizeIfKnown());
Spliterator<List<Integer>> four = sp.trySplit();
System.out.println(four.getExactSizeIfKnown());
which would print:
1024
2048
3072
4096
If you want to process 5000
elements at a time, you need to start with a Spliterator
that has a known size to begin with. You could put those partitions to an ArrayList
first:
public static void main(String[] args) {
List<Integer> coll = IntStream.range(0, 15_000).boxed().collect(Collectors.toList());
Iterable<List<Integer>> it = Iterables.partition(coll, 5000);
List<List<Integer>> list = new ArrayList<>();
it.forEach(list::add);
StreamSupport.stream(list.spliterator(), true)
.map(x -> {
System.out.println(
"Thread : " + Thread.currentThread().getName() +
" processed elements in the range : " + x.get(0) + " , " + x.get(x.size() - 1)
);
return x;
})
.flatMap(List::stream)
.collect(Collectors.toList());
}
On my machine it shows that they are processed by one thread each:
Thread : ForkJoinPool.commonPool-worker-5 processed elements in the range : 10000 , 14999
Thread : ForkJoinPool.commonPool-worker-19 processed elements in the range : 0 , 4999
Thread : main processed elements in the range : 5000 , 9999