Search code examples
javajava-streamillegalstateexception

Understanding problem regarding operated streams in Java


Let's say we have the following code

Stream<Integer> ints = Stream.of(1, 2, 3);
    
ints.peek(System.out::println);
    
ints.forEach(System.out::println);

and I run it, I'll get an exception:

Exception in thread "main" java.lang.IllegalStateException:
stream has already been operated upon or closed

But why?

peek is an intermediate operation, so I thought, that it won't run/start the stream itself? Only forEach does this, so why has the stream already been operated when reaching forEach?

I also thought, that the peek method would be discarded, because it returns a new Stream that I do not consider. Like in

String str = "hello world";
str.toUppercase();
str.charAt(0); // <-- h not H

Solution

  • peek is an intermediate operation, which means it performs an action and produces a new Stream. But you're reusing the same Stream, therefore getting an IllegalStateException.

    Here's a quote, from the Stream's Javadoc :

    A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.

    Amphesys added

    Note: as @Holger has pointed out documentation states (see the quote above) that in some cases intermediate stream operation might return the received stream (representing the previous stage) instead of generating a new one.

    If you write the code like this, it would work as intended:

    Stream<Integer> ints = Stream.of(1, 2, 3);
        
    Stream<Integer> ints1 = ints.peek(System.out::println);
        
    ints1.forEach(System.out::println);
    

    Which is basically a verbose equivalent of the chain:

    Stream.of(1, 2, 3)                 // ints
        .peek(System.out::println)     // ints1
        .forEach(System.out::println);
    

    Your understanding that a Stream would be galvanized into action only if it has a terminal operation is correct. And the stream you've presented in the code has a terminal operation.

    But it's worth to note that the exception occurs not while executing the stream, but while preparing the pipeline. I.e. when the Stream instance is being initialized.

    There's an internal class AbstractPipeline which we as users of the API not interacting directly, it's purpose defined as follows:

    Abstract base class for "pipeline" classes, which are the core

    • implementations of the Stream interface and its primitive specializations.

    • Manages construction and evaluation of stream pipelines

    Here's a piece of code from this class (link to the source):

    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)    // <- Note
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true; // <- Note
        previousStage.nextStage = this;
        
        this.previousStage = previousStage;
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
    

    In this implementation, each operation is represented as a stage. And stages are linked with each other. While constructing the next stage a link to next stage should be provided and boolean property linkedOrConsumed of the previous stage is being to true. If the previous stage has been already linked (which has happened in your code) the constructor throws an exception.

    For that reason, even the following code, which lacks terminal operation, would trigger an IllegalStateException (JDK 17, HotSpot JVM). It would take place during the preparation of the Stream object (not while executing the pipeline).

    Stream<Integer> ints = Stream.of(1, 2, 3);
    
    ints.map(i -> i * i);
    ints.takeWhile(i -> i % 2 != 0); // exception would be thrown while initializing this stage
    

    Note: this information is an implementation detail, and proved purely as explanation of the behavior OP has encountered. And the code snippet with a Stream with no terminal operation certainly is not guaranteed to behave this way (for instance, some implementation might simply discard it). The bottom line here is that a Stream should be consumed only once.