Even though I am using Supplier for my streams and using Supplier.Get() each time I want to retrieve my strem and perform a terminal operation on it, I am still getting the "stream has already been operated upon or closed" exception. Could anyone please look at my code and suggest what I am doing wrong?
Method where exception is being thrown:
private static void printMyDetails(Supplier<Stream<Result>> mySupplier, String myStatus) {
checkNotNull(mySupplier);
checkArgument(isNotEmpty(myStatus), "Invalid My status");
if (mySupplier.get().noneMatch(result -> true)) { //<-This is where the exception is thrown
if (log.isInfoEnabled()) {
log.info("Number of My with status '{}': {}", My, 0);
}
} else {
log.info("Number of My with status '{}': {}", myStatus, mySupplier.get().count());
log.info("Details of My(s) with status: '{}'", myStatus);
mySupplier.get().sorted().forEach(Utils::printMyNameAndDetails);
}
}
Place which is calling the above method:
rb.keySet().stream().filter(key -> containsIgnoreCase(key, "status")).map(rb::getString)
.filter(StringUtils::isNotEmpty).forEach(status -> {
var resultsWithExpectedStatusSupplier = requireNonNull(getResultsWithExpectedStatusSupplier(results, status));
resultsWithExpectedStatusSupplier.ifPresentOrElse(streamSupplier -> printMyDetails(streamSupplier, status), () -> {
if (log.isInfoEnabled())
log.info("0 My with status: {}", status);
});
});
The stream supplier:
private static Optional<Supplier<Stream<Result>>> getResultsWithExpectedStatusSupplier(
@NotNull List<Result> results, @NotNull String expectedStatus) {
checkArgument(!results.isEmpty(), "Results list is empty");
checkArgument(isNotEmpty(expectedStatus), "Invalid expected status");
var resultStreamWithExpectedStatus = requireNonNull(results.stream().filter(result -> ofNullable(result).map(Result::getStatus)
.allMatch(s -> isNotEmpty(s) && s.equalsIgnoreCase(expectedStatus))));
return resultStreamWithExpectedStatus.count() == 0 ? Optional.empty() : Optional.of(() -> resultStreamWithExpectedStatus);
}
The general problem is as Christian Ullenboom said: The stream has already been consumed. The exact location in your code is the call to resultStreamWithExpectedStatus.count()
in the method getResultsWithExpectedStatusSupplier
, as Stream.count
is a reduction/terminal operation which consumes the stream.
As stated in e.g. this answer, you cannot get the streams size without consuming it. Fix it by storing the filtered items in a collection (e.g. Collectors.toList
), querying the size there and returning the collection itself rather than the stream?
On a side note, I think you misuse Optional
a bit too much. The code could be simpler passing empty streams (or even better: pass an empty, filtered collection).