Search code examples
javajava-streamillegalstateexception

Stream supplier getting error 'stream has already been operated upon or closed'


Even though I am using Supplier for my streams and using Supplier.Get() each time I want to retrieve my strem and perform a terminal operation on it, I am still getting the "stream has already been operated upon or closed" exception. Could anyone please look at my code and suggest what I am doing wrong?

Method where exception is being thrown:

private static void printMyDetails(Supplier<Stream<Result>> mySupplier, String myStatus) {
        checkNotNull(mySupplier);
        checkArgument(isNotEmpty(myStatus), "Invalid My status");

        if (mySupplier.get().noneMatch(result -> true)) { //<-This is where the exception is thrown
            if (log.isInfoEnabled()) {
                log.info("Number of My with status '{}': {}", My, 0);
            }
        } else {
            log.info("Number of My with status '{}': {}", myStatus, mySupplier.get().count());
            log.info("Details of My(s) with status: '{}'", myStatus);
            mySupplier.get().sorted().forEach(Utils::printMyNameAndDetails);
        }
    }

Place which is calling the above method:

rb.keySet().stream().filter(key -> containsIgnoreCase(key, "status")).map(rb::getString)
                .filter(StringUtils::isNotEmpty).forEach(status -> {
            var resultsWithExpectedStatusSupplier = requireNonNull(getResultsWithExpectedStatusSupplier(results, status));
            resultsWithExpectedStatusSupplier.ifPresentOrElse(streamSupplier -> printMyDetails(streamSupplier, status), () -> {
                if (log.isInfoEnabled())
                    log.info("0 My with status: {}", status);
            });
        });

The stream supplier:

private static Optional<Supplier<Stream<Result>>> getResultsWithExpectedStatusSupplier(
            @NotNull List<Result> results, @NotNull String expectedStatus) {
        checkArgument(!results.isEmpty(), "Results list is empty");
        checkArgument(isNotEmpty(expectedStatus), "Invalid expected status");

        var resultStreamWithExpectedStatus = requireNonNull(results.stream().filter(result -> ofNullable(result).map(Result::getStatus)
                .allMatch(s -> isNotEmpty(s) && s.equalsIgnoreCase(expectedStatus))));
        return resultStreamWithExpectedStatus.count() == 0 ? Optional.empty() : Optional.of(() -> resultStreamWithExpectedStatus);
    }

Solution

  • The general problem is as Christian Ullenboom said: The stream has already been consumed. The exact location in your code is the call to resultStreamWithExpectedStatus.count() in the method getResultsWithExpectedStatusSupplier, as Stream.count is a reduction/terminal operation which consumes the stream.

    As stated in e.g. this answer, you cannot get the streams size without consuming it. Fix it by storing the filtered items in a collection (e.g. Collectors.toList), querying the size there and returning the collection itself rather than the stream?

    On a side note, I think you misuse Optional a bit too much. The code could be simpler passing empty streams (or even better: pass an empty, filtered collection).