Search code examples
javajava-11java-http-client

How to create a custom BodyPublisher for Java 11 HttpRequest


I'm trying to create a custom BodyPublisher that would deserialize my JSON object. I could just deserialize the JSON when I'm creating the request and use the ofByteArray method of BodyPublishers but I would rather use a custom publisher.

public class CustomPublisher implements HttpRequest.BodyPublisher {
    private byte[] bytes;
    
    public CustomPublisher(ObjectNode jsonData) {
        ...
        // Deserialize jsonData to bytes
        ...
    }
    
    @Override
    public long contentLength() {
        if(bytes == null) return 0;
        return bytes.length
    }
    
    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);       
    }

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

This implementation works, but only if subscriptions request method gets called with 1 as a parameter. But that's what happens when I am using it with the HttpRequest.

I'm pretty sure this is not any way preferred or optimal way of creating the custom subscription but I have yet to found better way to make it work.

I would greatly appreciate if anyone can lead me to a better path.


Solution

  • You are right to avoid making a byte array out of it, as that would create memory issues for large objects.

    I wouldn’t try to write a custom publisher. Rather, just take advantage of the factory method HttpRequest.BodyPublishers.ofInputStream.

    HttpRequest.BodyPublisher publisher =
        HttpRequest.BodyPublishers.ofInputStream(() ->  {
            PipedInputStream in = new PipedInputStream();
    
            ForkJoinPool.commonPool().submit(() -> {
                try (PipedOutputStream out = new PipedOutputStream(in)) {
                    objectMapper.writeTree(
                        objectMapper.getFactory().createGenerator(out),
                        jsonData);
                }
                return null;
            });
    
            return in;
        });
    

    As you have noted, you can use HttpRequest.BodyPublishers.ofByteArray. That is fine for relatively small objects, but I program for scalability out of habit. The problem with assuming code won’t need to scale is that other developers will assume it is safe to pass large objects, without realizing the impact on performance.

    Writing your own body publisher will be a lot of work. Its subscribe method is inherited from Flow.Publisher.

    The documentation for the subscribe method starts with this:

    Adds the given Subscriber if possible.

    Each time your subscribe method is called, you need to add the Subscriber to some sort of colllection, you need to create an implementation of Flow.Subscription, and you need to immediately pass it to the subscriber’s onSubscribe method. Your Subscription implementation object needs to send back one or more ByteBuffers, only when the Subscription’s request method is called, by invoking the corresponding Subscriber’s (not just any Subscriber’s) onNext method, and once you’ve sent all of the data, you must call the same Subscriber’s onComplete() method. On top of that, the Subscription implementation object needs to handle cancel requests.

    You can make a lot of this easier by extending SubmissionPublisher, which is a default implementation of Flow.Publisher, and then adding a contentLength() method to it. But as the SubmissionPublisher documentation shows, you still have a fair amount of work to do, for even a minimal working implementation.

    The HttpRequest.BodyPublishers.of… methods will do all of this for you. ofByteArray is okay for small objects, but ofInputStream will work for any object you could ever pass in.