I have a java stream of undefined length. Now I need to load some meta data from the database and assign it to the streamed data.
I cannot:
Thus I thought I could load the metadata in partitions from the database.
I need a method like this:
<T> Stream<List<T>> partition(Stream<T> stream, int partitionSize)
so I can use it like this
partition(dataSource.stream(), 1000)
.map(metadataSource::populate)
.flatMap(List::stream)
.forEach(this::doSomething);
I already found Guava's Iteralbes#partition but that would force me to convert the stream to an iterable, partition it and convert it to a stream again. Is there something inbuilt for the stream partitioning or is there an easy way to implement it myself?
I haven't found an existing method that does this already, so I implemented one myself:
public class Partitioner<E> implements Iterator<List<E>> {
private final Iterator<E> iterator;
private final int partitionSize;
public static <T> Stream<List<T>> partition(final Stream<T> stream, final int partitionSize) {
return new Partitioner<>(stream, partitionSize).asStream();
}
public Partitioner(final Stream<E> stream, final int partitionSize) {
this(stream.iterator(), partitionSize);
}
public Partitioner(final Iterator<E> iterator, final int partitionSize) {
this.iterator = iterator;
this.partitionSize = partitionSize;
}
@Override
public boolean hasNext() {
return this.iterator.hasNext();
}
@Override
public List<E> next() {
if (!hasNext()) {
throw new NoSuchElementException("No more elements");
}
final ArrayList<E> result = new ArrayList<>(this.partitionSize);
for (int i = 0; i < this.partitionSize && hasNext(); i++) {
result.add(this.iterator.next());
}
return result;
}
public Stream<List<E>> asStream() {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL), false);
}
}