Search code examples
jerseyvert.x

Vert.x ReadStream<Buffer> to InputStream


I am trying to set up Vert.X with Jersey to handle POST data (not necessarily form data).

The Jersey ContainerRequest.setEntityStream takes in an InputStream which is what I am trying to build. However, I can't seem to get around passing that data without reading the whole thing into memory using bodyHandler or my own custom method that does something similar but limits the input

    final Buffer body = Buffer.buffer();
    event
        .handler(buffer -> {
            if (!event.response().headWritten()) {
                body.appendBuffer(buffer);
                if (body.length() > 10 * 1024 * 1024) {
                    event.response()
                        .setStatusCode(REQUEST_ENTITY_TOO_LARGE.getStatusCode())
                        .setStatusMessage(REQUEST_ENTITY_TOO_LARGE.getReasonPhrase())
                        .end();
                }
            }
        })
        .endHandler(aVoid -> {
            request.setEntityStream(new VertxBufferInputStream(body));
            appHandler.handle(request);
        });

VertxBufferInputStream is a simple wrapper to the VertXbuffer. Just to save some memory by avoiding converting to a ByteArrayInputStream(). But it has the whole body.

I want to avoid having the whole body and have it streamed. I've tried a few things all pretty hacky and bad code that eventually does not work because it blocks the event loop because the handler is not called and is waiting for it.


Solution

  • There are two components that are needed.

    1. you need to make sure you separate the processing for anything that may block using vertx.executeBlocking see https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L128

    2. you need to handle two events: when new buffer of data comes in and when it ends. https://github.com/trajano/app-ms/blob/7f1de326683473839ffe85fe711cbe719f7a0a74/ms-engine/src/main/java/net/trajano/ms/engine/JaxRsRoute.java#L123

    3. you need to implement an InputStream that would be able to accept data from another thread and block when there is no data along with the capability of receiving a message that there is no more input. https://github.com/trajano/app-ms/blob/master/ms-engine/src/main/java/net/trajano/ms/engine/internal/VertxBlockingInputStream.java