Search code examples
javajava-8java-streamjava-10

Internal changes for limit and unordered stream


Basically this came up while trying to answer another question. Suppose this code:

AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
        .parallel()
        .peek(x -> count.incrementAndGet())
        .limit(5)
        .forEach(System.out::println);

System.out.println("count = " + count);

I understand the fact that IntStream#generate is an unordered infinite stream and for it to finish there has to be a short-circuiting operation (limit in this case). I also understand that the Supplier is free to be called as many number of times the Stream implementation feels like before it reaches that limit.

Running this under java-8, would print count always 512 (may be not always, but it is so on my machine).

On the contrast running this under java-10 rarely exceeds 5. So my question is what changed internally that the short-circuiting happens so much better (I am trying to answer this on my own by having the sources and trying to do some diffs... )


Solution

  • The change happened somewhere between Java 9, beta 103 and Java 9, beta 120 (JDK‑8154387).

    The responsible class is StreamSpliterators.UnorderedSliceSpliterator.OfInt, resp. its super class StreamSpliterators.UnorderedSliceSpliterator.

    The old version of the class looked like

    abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
        static final int CHUNK_SIZE = 1 << 7;
    
        // The spliterator to slice
        protected final T_SPLITR s;
        protected final boolean unlimited;
        private final long skipThreshold;
        private final AtomicLong permits;
    
        UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
            this.s = s;
            this.unlimited = limit < 0;
            this.skipThreshold = limit >= 0 ? limit : 0;
            this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
        }
    
        UnorderedSliceSpliterator(T_SPLITR s,
                                  UnorderedSliceSpliterator<T, T_SPLITR> parent) {
            this.s = s;
            this.unlimited = parent.unlimited;
            this.permits = parent.permits;
            this.skipThreshold = parent.skipThreshold;
        }
    

            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                Objects.requireNonNull(action);
    
                ArrayBuffer.OfRef<T> sb = null;
                PermitStatus permitStatus;
                while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                    if (permitStatus == PermitStatus.MAYBE_MORE) {
                        // Optimistically traverse elements up to a threshold of CHUNK_SIZE
                        if (sb == null)
                            sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
                        else
                            sb.reset();
                        long permitsRequested = 0;
                        do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
                        if (permitsRequested == 0)
                            return;
                        sb.forEach(action, acquirePermits(permitsRequested));
                    }
                    else {
                        // Must be UNLIMITED; let 'er rip
                        s.forEachRemaining(action);
                        return;
                    }
                }
            }
    

    As we can see, it attempts to buffer up to CHUNK_SIZE = 1 << 7 elements in each spliterator, which may end up at “number of CPU cores”×128 elements.

    In contrast, the new version looks like

    abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
        static final int CHUNK_SIZE = 1 << 7;
    
        // The spliterator to slice
        protected final T_SPLITR s;
        protected final boolean unlimited;
        protected final int chunkSize;
        private final long skipThreshold;
        private final AtomicLong permits;
    
        UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
            this.s = s;
            this.unlimited = limit < 0;
            this.skipThreshold = limit >= 0 ? limit : 0;
            this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
                ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
            this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
        }
    
        UnorderedSliceSpliterator(T_SPLITR s,
                                  UnorderedSliceSpliterator<T, T_SPLITR> parent) {
            this.s = s;
            this.unlimited = parent.unlimited;
            this.permits = parent.permits;
            this.skipThreshold = parent.skipThreshold;
            this.chunkSize = parent.chunkSize;
        }
    

            @Override
            public void forEachRemaining(Consumer<? super T> action) {
                Objects.requireNonNull(action);
    
                ArrayBuffer.OfRef<T> sb = null;
                PermitStatus permitStatus;
                while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                    if (permitStatus == PermitStatus.MAYBE_MORE) {
                        // Optimistically traverse elements up to a threshold of chunkSize
                        if (sb == null)
                            sb = new ArrayBuffer.OfRef<>(chunkSize);
                        else
                            sb.reset();
                        long permitsRequested = 0;
                        do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                        if (permitsRequested == 0)
                            return;
                        sb.forEach(action, acquirePermits(permitsRequested));
                    }
                    else {
                        // Must be UNLIMITED; let 'er rip
                        s.forEachRemaining(action);
                        return;
                    }
                }
            }
    

    So now there is an instance field chunkSize. When there is a defined limit and the expression ((skip + limit) / AbstractTask.LEAF_TARGET) + 1 evaluates to a smaller value than CHUNK_SIZE, that smaller value will be used. So when having small limits, the chunkSize will be much smaller. In your case with a limit of 5, the chunk size will always be 1.