Search code examples
javajava-streamcollectors

Collector to split stream up into chunks of given size


I've got a problem at hand that I'm trying to solve with something I'm pretty sure I'm not supposed to do but don't see an alternative. I'm given a List of Strings and should split it up into chunks of a given size. The result then has to be passed to some method for further processing. As the list might be huge the processing should be done asynchronously.

My approach is to create a custom Collector that takes the Stream of Strings and converts it to a Stream<List<Long>>:

final Stream<List<Long>> chunks = list
                        .stream()
                        .parallel()
                        .collect(MyCollector.toChunks(CHUNK_SIZE)) 
                        .flatMap(p -> doStuff(p))
                        .collect(MyCollector.toChunks(CHUNK_SIZE))
                        .map(...)
                        ...

The code for the Collector:

public final class MyCollector<T, A extends List<List<T>>, R extends Stream<List<T>>> implements Collector<T, A, R> {
private final AtomicInteger index = new AtomicInteger(0);
private final AtomicInteger current = new AtomicInteger(-1);
private final int chunkSize;

private MyCollector(final int chunkSize){
    this.chunkSize = chunkSize;
}

@Override
public Supplier<A> supplier() {
    return () -> (A)new ArrayList<List<T>>();
}

@Override
public BiConsumer<A, T> accumulator() {
    return (A candidate, T acc) -> {
        if (index.getAndIncrement() % chunkSize == 0){
            candidate.add(new ArrayList<>(chunkSize));
            current.incrementAndGet();
        }
        candidate.get(current.get()).add(acc);
    };
}

@Override
public BinaryOperator<A> combiner() {
    return (a1, a2) -> {
        a1.addAll(a2);
        return a1;
    };
}
@Override
public Function<A, R> finisher() {
    return (a) -> (R)a.stream();
}

@Override
public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED));
}

public static <T> MyCollector<T, List<List<T>>, Stream<List<T>>> toChunks(final int chunkSize){
    return new MyCollector<>(chunkSize);
}

}

This seems to work in most cases but I get a NPE sometimes.. I'm sure the in the accumulator is not thread safe as there might be two threads interfering when adding new Lists to the main List. I don't mind a chunk having a few too many or too little elements though.

I've tried this instead of the current supplier function:

 return () -> (A)new ArrayList<List<T>>(){{add(new ArrayList<T>());}};

To make sure there is always a List present. This doesn't work at all and results in empty lists.

Issues:

  • I'm pretty sure a custom Spliterator would be a good solution. It would not work for synchronous scenarios however. Also, am I sure the Spliterator is called?
    • I'm aware I shouldn't have state at all but not sure how to change it.

Questions:

  • Is this approach completely wrong or somehow fixable?
  • If I use a Spliterator - can I be sure it's called or is that decided by the underlying implementation?
  • I'm pretty sure the casts to (A) and (R) in the supplier and finisher are not necessary but IntelliJ complains. Is there something I'm missing?

EDIT:

  • I've added some more to the client code as the suggestions with IntStream.range won't work when chained.
  • I realize I could do it differently as suggested in a comment but it's also a little bit about style and knowing if it's possible.
  • I have CONCURRENT characteristic because I assume the Stream API would fall back to synchronous handling otherwise. The solution is not thread-safe as stated before.

Any help would be greatly appreciated.

Best, D


Solution

  • I can't comment yet, but I wanted to post the following link to a very similar issue (though not a duplicate, as far as I understand): Java 8 Stream with batch processing

    You might also be interested in the following issue on GitHub: https://github.com/jOOQ/jOOL/issues/296


    Now, your use of CONCURRENT characteristic is wrong - the doc say the following about Collector.Characteristics.CONCURRENT:

    Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

    This means that the supplier only gets called once, and the combiner actually never gets called (cf. the source of ReferencePipeline.collect() method). That's why you got NPEs sometimes.

    As a result, I suggest a simplified version of what you came up with:

    public static <T> Collector<T, List<List<T>>, Stream<List<T>>> chunked(int chunkSize) {
      return Collector.of(
              ArrayList::new,
              (outerList, item) -> {
                if (outerList.isEmpty() || last(outerList).size() >= chunkSize) {
                  outerList.add(new ArrayList<>(chunkSize));
                }
                last(outerList).add(item);
              },
              (a, b) -> {
                a.addAll(b);
                return a;
              },
              List::stream,
              Collector.Characteristics.UNORDERED
      );
    }
    
    private static <T> T last(List<T> list) {
      return list.get(list.size() - 1);
    }
    

    Alternatively, you could write a truly concurrent Collector using proper synchronization, but if you don't mind having more than one list with a size less than chunkSize (which is the effect you can get with a non-concurrent Collector like the one I proposed above), I wouldn't bother.