Search code examples
micronaut

Failing to process StreamingFileUpload


I've encountered an issue trying to loop through the parts of the StreamingFileUpload provided by Micronaut and reading the bytes into a ByteArrayOutputStream (so it can be batched together and sent to S3) which cause either a NullPointerException or IndexOutOfBoundsException.

It occasionally fails on the first ever upload on application start but subsequent uploads seem to be OK (issues have never been encountered for any upload after the first). Unclear if there's a fault in how I've implemented the ByteArrayOutputStream.

final baos = new ByteArrayOutputStream()
baos.withCloseable { stream ->
    Flowable.fromPublisher( file )
            .all( { partData ->
                stream.write( partData.bytes )
                true
            } ).blockingGet()
}

Exceptions

ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: null
java.lang.NullPointerException: null
        at io.netty.buffer.CompositeByteBuf.updateComponentOffsets(CompositeByteBuf.java:573)
        at io.netty.buffer.CompositeByteBuf.removeComponent(CompositeByteBuf.java:593)

Alternate variation of error

ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: cIndex: 14 (expected: >= 0 && <= numComponents(13))
java.lang.IndexOutOfBoundsException: cIndex: 14 (expected: >= 0 && <= numComponents(13))
        at io.netty.buffer.CompositeByteBuf.checkComponentIndex(CompositeByteBuf.java:548)

Sample project


Solution

  • The errors were due to sample code handling parts incorrectly and incorrect usage of .blockingGet, access to the part data is not a thread safe operation.

    Credit to jameskleeh for the solution.

    @Override
    Single<HttpResponse<String>> upload(StreamingFileUpload file) {
        log.info("Received file upload for file: ${file.filename}")
        int partCounter = 0
        Single.<HttpResponse<String>>create({ emitter ->
            ByteArrayOutputStream baos = new ByteArrayOutputStream()
            baos.withCloseable {
                file.subscribe(new Subscriber<PartData>() {
                    private Subscription s
    
                    @Override
                    void onSubscribe(Subscription s) {
                        this.s = s
                        s.request(1)
                    }
    
                    @Override
                    void onNext(PartData partData) {
                        log.info("Processing part ${partCounter++}")
                        baos.write(partData.bytes)
                        s.request(1)
                    }
    
                    @Override
                    void onError(Throwable t) {
                        emitter.onError(t)
                    }
    
                    @Override
                    void onComplete() {
                        log.info("Successfully streamed file. Output length: ${baos.size()}")
                        emitter.onSuccess(HttpResponse.ok('Success'))
                    }
                })
            }
        })
    }
    

    https://github.com/micronaut-projects/micronaut-core/issues/2488