Search code examples
javajava-streamcollectorsconsumer

Why is my `Stream` being closed at runtime?


I have this bit of code:

var blong = Stream.iterate(BigInteger.ZERO, bi -> bi.add(BigInteger.ONE))
    .collect(Collector.of(
        () -> Stream.of(),
        (s, bi) -> Stream.concat(s, Stream.of(bi)),
        (s1, s2) -> Stream.concat(s1, s2),
        s -> s
    ));

System.out.println(blong.getClass().getName());

It doesn't work properly. I'm getting an IllegalStateException:

Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:346)
at java.base/java.util.stream.Stream.concat(Stream.java:1618)
at UninitializedTest.lambda$2(UninitializedTest.java:28)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.base/java.util.stream.Stream$1.tryAdvance(Stream.java:1469)
at java.base/java.util.Spliterator.forEachRemaining(Spliterator.java:332)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at UninitializedTest.main(UninitializedTest.java:27)

It appears that the Streams being returned by my Supplier are being closed immediately upon creation.

Even if I create a Stream from an empty List or a Stream with some actual BigInteger data in it, I get the same error.

Why is my stream being closed?


Solution

  • To begin with, let's dissect the custom collector you've created:

    Collector.of(
        () -> Stream.of(),                          // Container - this object is meant accumulate the result by consuming stream elements (in this case it can't really change its state, but nominally its still a container)
        (s, bi) -> Stream.concat(s, Stream.of(bi)), // Accumulator - is meant to define how stream elements should be accumulated in the container (again in this case we can't do anything with the container)
        (s1, s2) -> Stream.concat(s1, s2),          // Combiner - defines how containers should be merged while executing stream in parallel (NB: - we can replace it with a method reference Stream::concat)
        s -> s                                      // Finisher function - describes the final transformation which should be performed with container (NB: since it doesn't perform any action we can omit this argument, there's an overloaded version which doesn't expect Finisher)
    )
    

    First of all it's worth to point out that Stream is not a container of data (like a Collection).

    Hence, providing an empty stream () -> Stream.of() as a container of the collector is a mistake - the container needs to be mutable. But we can't push elements into an empty stream.

    Secondly, the accumulator (the second argument of Collector.of()) of your custom collector is not doing what you probably expect it to do.

    Accumulator is a BiConsumer<R, T> and you've implemented it as follows:

    (s, bi) -> Stream.concat(s, Stream.of(bi))
    

    Here Stream.concat() consumes all the elements from the stream s and a stream returned by Stream.of(bi) and generates a new unnamed stream, which very soon becomes a prey of the Garbage Collector. Reminder: BiConsumer doesn't return a value, so the stream returned by concat() vanishes.

    The stream s remains (meaning the collector knows it's reference), but it's already consumed while executing concat(), i.e. it's closed. It happens at the point when the first stream element (BigInteger.ZERO) gets processes. And when the collector tries to process the second element, you're getting an exception because concat() attempts to consume the stream s which has been already closed.


    When the Consumer fires, I'm expecting the Stream<Stream<BigInteger>> to be consumed, returning a Stream<BigInteger>

    Firstly, BiConsumer as well as Consumer has an abstract method accept() which is void, it isn't meant to return anything.

    It appears that the Streams being returned by my Supplier are being closed

    Secondly, it feels like you have a misconception of how Collector work. An instance of a mutable container would be created only once in the sequential scenario of execution (and one container per thread in parallel, unless you specify that it's a concurrent collector by providing Collector.Characteristics.CONCURRENT, in such case all thread would share the same container).

    Container should be a mutable object (otherwise it would not be useful like in your case), and its job is to accumulate stream elements. I.e. container changes its state, while collector consumes elements from the stream.

    The stream returned by Stream.iterate(seed,f) would be of type Stream<BigInteger>.

    The Container produced by supplier () -> Stream.of() would be of type Stream<Object> because the compiler can't infer the type of empty stream, unless you explicitly specify it using Type-Witness like .<BigInteger>of().

    For that reason, the stream returned by concat() in side the accumulator would be also of type Stream<Object>. And reminder this stream would be ignored.

    There wouldn't be such a beast like Stream<Stream<BigInteger>> anywhere, either in the pipeline, or inside the collector.

    And lastly, I'll reiterate that it's inherently impossible to add an element into a Stream.