Search code examples
javahttpreactivejava-http-client

How do you read and print a chunked HTTP response using java.net.http as chunks arrive?


Java 11 introduces a new package, java.net.http, for making HTTP requests. For general usage, it's pretty straight forward.

My question is: how do I use java.net.http to handle chunked responses as each chunk is received by the client?

java.http.net contains a reactive BodySubscriber which appears to be what I want, but I can't find an example of how it's used.

http_get_demo.py

Below is a python implementation that prints chunks as they arrive, I'd like to the same thing with java.net.http:

import argparse
import requests


def main(url: str):
    with requests.get(url, stream=True) as r:
        for c in r.iter_content(chunk_size=1):
            print(c.decode("UTF-8"), end="")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Read from a URL and print as text as chunks arrive")
    parser.add_argument('url', type=str, help="A URL to read from")
    args = parser.parse_args()

    main(args.url)

HttpGetDemo.java

Just for completeness, here's a simple example of making a blocking request using java.net.http:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

public class HttpGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();
    var response = client.send(request, bodyHandler);
    System.out.println(response.body());

  }
}

HttpAsyncGetDemo.java

And here's the example making an non-blocking/async request:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpRequest;

/**
 * ReadChunked
 */
public class HttpAsyncGetDemo {

  public static void main(String[] args) throws Exception {

    var request = HttpRequest.newBuilder()
            .uri(URI.create(args[0]))
            .build();

    var bodyHandler = HttpResponse.BodyHandlers
            .ofString();

    var client = HttpClient.newHttpClient();

    client.sendAsync(request, bodyHandler)
            .thenApply(HttpResponse::body)
            .thenAccept(System.out::println)
            .join();

  }
}

Solution

  • Thanks to @pavel and @chegar999 for their partial answers. They led me to my solution.

    Overview

    The solution I came up with is below. Basically, the solution is to use a custom java.net.http.HttpResponse.BodySubscriber. A BodySubscriber contains reactive methods (onSubscribe, onNext, onError, and onComplete) and a getBody method that basically returns a java CompletableFuture that will eventually produce the body of the HTTP request. Once you have your BodySubscriber in hand you can use it like:

    HttpClient client = HttpClient.newHttpClient();
    HttpRequest request = HttpRequest.newBuilder()
        .uri(URI.create(uri))
        .build();
    
    return client.sendAsync(request, responseInfo -> new StringSubscriber())
        .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
        .thenApply(HttpResponse::body);
    

    Note the line:

    client.sendAsync(request, responseInfo -> new StringSubscriber())

    That's where we register our custom BodySubscriber; in this case, my custom class is named StringSubscriber.

    CustomSubscriber.java

    This is a complete working example. Using Java 11, you can run it without compiling it. Just past it into a file named CustomSubscriber.java, then run the command java CustomSubscriber <some url>. It prints the contents of each chunk as it arrives. It also collects them and returns them as the body when the response has completed.

    import java.net.http.HttpClient;
    import java.net.http.HttpRequest;
    import java.net.http.HttpResponse;
    import java.net.http.HttpResponse.BodyHandlers;
    import java.net.http.HttpResponse.BodySubscriber;
    import java.net.URI;
    import java.nio.ByteBuffer;
    import java.nio.charset.StandardCharsets;
    import java.util.ArrayList;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.Flow;
    import java.util.stream.Collectors;
    import java.util.List;
    
    public class CustomSubscriber {
    
      public static void main(String[] args) {
        CustomSubscriber cs = new CustomSubscriber();
        String body = cs.get(args[0]).join();
        System.out.println("--- Response body:\n: ..." + body + "...");
      }
    
      public CompletableFuture<String> get(String uri) {
        HttpClient client = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(uri))
            .build();
    
        return client.sendAsync(request, responseInfo -> new StringSubscriber())
            .whenComplete((r, t) -> System.out.println("--- Status code " + r.statusCode()))
            .thenApply(HttpResponse::body);
      }
    
      static class StringSubscriber implements BodySubscriber<String> {
    
        final CompletableFuture<String> bodyCF = new CompletableFuture<>();
        Flow.Subscription subscription;
        List<ByteBuffer> responseData = new CopyOnWriteArrayList<>();
    
        @Override
        public CompletionStage<String> getBody() {
          return bodyCF;
        }
    
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
          this.subscription = subscription;
          subscription.request(1); // Request first item
        }
    
        @Override
        public void onNext(List<ByteBuffer> buffers) {
          System.out.println("-- onNext " + buffers);
          try {
            System.out.println("\tBuffer Content:\n" + asString(buffers));
          } 
          catch (Exception e) {
            System.out.println("\tUnable to print buffer content");
          }
          buffers.forEach(ByteBuffer::rewind); // Rewind after reading
          responseData.addAll(buffers);
          subscription.request(1); // Request next item
        }
    
        @Override
        public void onError(Throwable throwable) {
          bodyCF.completeExceptionally(throwable);
        }
    
        @Override
        public void onComplete() {
          bodyCF.complete(asString(responseData));
        }
    
        private String asString(List<ByteBuffer> buffers) {
          return new String(toBytes(buffers), StandardCharsets.UTF_8);
        }
    
        private byte[] toBytes(List<ByteBuffer> buffers) {
          int size = buffers.stream()
              .mapToInt(ByteBuffer::remaining)
              .sum();
          byte[] bs = new byte[size];
          int offset = 0;
          for (ByteBuffer buffer : buffers) {
            int remaining = buffer.remaining();
            buffer.get(bs, offset, remaining);
            offset += remaining;
          }
          return bs;
        }
    
      }
    }
    

    Trying it out

    To test this solution, you'll need a server that sends a response that uses Transfer-encoding: chunked and sends it slow enough to watch the chunks arrive. I've created one at https://github.com/hohonuuli/demo-chunk-server but you can spin it up using Docker like so:

    docker run -p 8080:8080 hohonuuli/demo-chunk-server

    Then run the CustomSubscriber.java code using java CustomSubscriber.java http://localhost:8080/chunk/10