I've got a problem at hand that I'm trying to solve with something I'm pretty sure I'm not supposed to do but don't see an alternative. I'm given a List of Strings and should split it up into chunks of a given size. The result then has to be passed to some method for further processing. As the list might be huge the processing should be done asynchronously.
My approach is to create a custom Collector that takes the Stream of Strings and converts it to a Stream<List<Long>>:
final Stream<List<Long>> chunks = list
.stream()
.parallel()
.collect(MyCollector.toChunks(CHUNK_SIZE))
.flatMap(p -> doStuff(p))
.collect(MyCollector.toChunks(CHUNK_SIZE))
.map(...)
...
The code for the Collector:
public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;
private MyCollector(final int chunkSize){
this.chunkSize = chunkSize;
}
@Override
public Supplier<A> supplier() {
return () -> (A)new ArrayList<List<T>>();
}
@Override
public BiConsumer<A, T> accumulator() {
return (A candidate, T acc) -> {
if (index.getAndIncrement() % chunkSize == 0){
candidate.add(new ArrayList<>(chunkSize));
current.incrementAndGet();
}
candidate.get(current.get()).add(acc);
};
}
@Override
public BinaryOperator<A> combiner() {
return (a1, a2) -> {
a1.addAll(a2);
return a1;
};
}
@Override
public Function<A, R> finisher() {
return (a) -> (R)a.stream();
}
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}
public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
return new MyCollector<>(chunkSize);
}
}
This seems to work in most cases but I get a NPE sometimes.. I'm sure the in the accumulator is not thread safe as there might be two threads interfering when adding new Lists to the main List. I don't mind a chunk having a few too many or too little elements though.
I've tried this instead of the current supplier function:
return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};
To make sure there is always a List present. This doesn't work at all and results in empty lists.
Issues:
Questions:
EDIT:
Any help would be greatly appreciated.
Best, D
I can't comment yet, but I wanted to post the following link to a very similar issue (though not a duplicate, as far as I understand): Java 8 Stream with batch processing
You might also be interested in the following issue on GitHub: https://github.com/jOOQ/jOOL/issues/296
Now, your use of CONCURRENT
characteristic is wrong - the doc say the following about Collector.Characteristics.CONCURRENT
:
Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.
This means that the supplier
only gets called once, and the combiner
actually never gets called (cf. the source of ReferencePipeline.collect()
method). That's why you got NPEs sometimes.
As a result, I suggest a simplified version of what you came up with:
public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) {
return Collector.of(
ArrayList::new,
(outerList, item) -> {
if (outerList.isEmpty() || last(outerList).size() >= chunkSize) {
outerList.add(new ArrayList<>(chunkSize));
}
last(outerList).add(item);
},
(a, b) -> {
a.addAll(b);
return a;
},
List::stream,
Collector.Characteristics.UNORDERED
);
}
private static <T> T last(List<T> list) {
return list.get(list.size() - 1);
}
Alternatively, you could write a truly concurrent Collector
using proper synchronization, but if you don't mind having more than one list with a size less than chunkSize
(which is the effect you can get with a non-concurrent Collector
like the one I proposed above), I wouldn't bother.