Search code examples
javajava-8batch-processingjava-stream

Java 8 Stream with batch processing


I have a large file that contains a list of items.

I would like to create a batch of items, make an HTTP request with this batch (all of the items are needed as parameters in the HTTP request). I can do it very easily with a for loop, but as Java 8 lover, I want to try writing this with Java 8's Stream framework (and reap the benefits of lazy processing).

Example:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

I want to do something a long the line of lazyFileStream.group(500).map(processBatch).collect(toList())

What would be the best way to do this?


Solution

  • Note! This solution reads the whole file before running the forEach.

    You could do it with jOOλ, a library that extends Java 8 streams for single-threaded, sequential stream use-cases:

    Seq.seq(lazyFileStream)              // Seq<String>
       .zipWithIndex()                   // Seq<Tuple2<String, Long>>
       .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
       .forEach((index, batch) -> {
           process(batch);
       });
    

    Behind the scenes, zipWithIndex() is just:

    static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
        final Iterator<T> it = stream.iterator();
    
        class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
            long index;
    
            @Override
            public boolean hasNext() {
                return it.hasNext();
            }
    
            @Override
            public Tuple2<T, Long> next() {
                return tuple(it.next(), index++);
            }
        }
    
        return seq(new ZipWithIndex());
    }
    

    ... whereas groupBy() is API convenience for:

    default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
        return collect(Collectors.groupingBy(classifier));
    }
    

    (Disclaimer: I work for the company behind jOOλ)