I've encountered an issue trying to loop through the parts of the StreamingFileUpload provided by Micronaut and reading the bytes into a ByteArrayOutputStream (so it can be batched together and sent to S3) which cause either a NullPointerException or IndexOutOfBoundsException.
It occasionally fails on the first ever upload on application start but subsequent uploads seem to be OK (issues have never been encountered for any upload after the first). Unclear if there's a fault in how I've implemented the ByteArrayOutputStream.
final baos = new ByteArrayOutputStream()
baos.withCloseable { stream ->
Flowable.fromPublisher( file )
.all( { partData ->
stream.write( partData.bytes )
true
} ).blockingGet()
}
Exceptions
ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: null
java.lang.NullPointerException: null
at io.netty.buffer.CompositeByteBuf.updateComponentOffsets(CompositeByteBuf.java:573)
at io.netty.buffer.CompositeByteBuf.removeComponent(CompositeByteBuf.java:593)
Alternate variation of error
ERROR i.m.h.s.netty.RoutingInBoundHandler - Unexpected error occurred: cIndex: 14 (expected: >= 0 && <= numComponents(13))
java.lang.IndexOutOfBoundsException: cIndex: 14 (expected: >= 0 && <= numComponents(13))
at io.netty.buffer.CompositeByteBuf.checkComponentIndex(CompositeByteBuf.java:548)
The errors were due to sample code handling parts incorrectly and incorrect usage of .blockingGet
, access to the part data is not a thread safe operation.
Credit to jameskleeh for the solution.
@Override
Single<HttpResponse<String>> upload(StreamingFileUpload file) {
log.info("Received file upload for file: ${file.filename}")
int partCounter = 0
Single.<HttpResponse<String>>create({ emitter ->
ByteArrayOutputStream baos = new ByteArrayOutputStream()
baos.withCloseable {
file.subscribe(new Subscriber<PartData>() {
private Subscription s
@Override
void onSubscribe(Subscription s) {
this.s = s
s.request(1)
}
@Override
void onNext(PartData partData) {
log.info("Processing part ${partCounter++}")
baos.write(partData.bytes)
s.request(1)
}
@Override
void onError(Throwable t) {
emitter.onError(t)
}
@Override
void onComplete() {
log.info("Successfully streamed file. Output length: ${baos.size()}")
emitter.onSuccess(HttpResponse.ok('Success'))
}
})
}
})
}
https://github.com/micronaut-projects/micronaut-core/issues/2488