During doing a tutorial about JDK11 HttpClient
, using a https://httpstat.us/500?sleep=1000
endpoint which is returning HTTP 500
after 1 second, I prepared the following piece of code:
HttpClient client = HttpClient.newHttpClient();
var futures = Stream.of(
"https://httpstat.us/500?sleep=1000",
"https://httpstat.us/500?sleep=1000",
"https://httpstat.us/500?sleep=1000"
).map(link -> client
.sendAsync(
newBuilder(URI.create(link)).GET().build(),
HttpResponse.BodyHandlers.discarding()
).thenApply(HttpResponse::statusCode)
).collect(Collectors.toList());
futures.stream().map(CompletableFuture::join).forEach(System.out::println);
and it is working fine. Program execution takes ~1.5s, output is being rendered in terminal at the same time for all three calls - everything is good.
But when I'm changing this to
HttpClient client = HttpClient.newHttpClient();
Stream.of(
"https://httpstat.us/500?sleep=1000",
"https://httpstat.us/500?sleep=1000",
"https://httpstat.us/500?sleep=1000"
).map(link -> client
.sendAsync(
newBuilder(URI.create(link)).GET().build(),
HttpResponse.BodyHandlers.discarding()
).thenApply(HttpResponse::statusCode)
).map(CompletableFuture::join).forEach(System.out::println);
it seems to not be working async anymore - three 500
are being shown one by one with 1 second delay before each.
Why? What am I missing here?
This is because the map
method on a Java Stream
is an "intermediate operation", and therefore lazy. This means the Function
passed to it is not invoked on the elements of the stream until something downstream from it consumes the element.
This is described in the JavaDoc section called "Stream operations and pipelines" (with my comments added in square brackets):
Intermediate operations return a new stream. They are always lazy; executing an intermediate operation such as
filter()
[ormap()
] does not actually perform any filtering [or mapping], but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate [or are transformed by the given function in the case ofmap()
]. Traversal of the pipeline source does not begin until the terminal operation of the pipeline is executed.
In this case, it means that the requests aren't made until the stream is consumed.
In the first example, collect()
is the terminal operation that consumes the stream. The result is a list of CompletableFuture
objects that represent the running requests.
In the second example, forEach
is the terminal operation that consumes each element of the stream, one by one. Because the join
operation is contained within that stream, each join
completes before the element is passed on to forEach
. Subsequent elements are consumed sequentially, and therefore each request is not even made until the prior request completes.