Search code examples
javaspring-webfluxspring-webclient

Webflux Webclient logs "Failed to release a message" when HttpStatus::is4xxClientError occurs


I'm trying to download a zip file from server using Webflux WebClient. What is the correct way to handle errors?

When file exists on server, everything is OK. Otherwise, I got a warning from Netty that it couldn't release a message.

return client.get()
      .uri(String.format("/cache/%s", filename))
      .accept(MediaType.APPLICATION_OCTET_STREAM)
      .retrieve()
      .onStatus(HttpStatus::is3xxRedirection, response -> Mono.error(new RuntimeException(response.statusCode().getReasonPhrase())))
      .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException(response.statusCode().getReasonPhrase())))
      .bodyToMono(ByteArrayResource.class)
      .subscribeOn(Schedulers.parallel())
      .map(res -> {
        try {
          InputStream is = res.getInputStream();
          File targetFile = Paths.get(cacheDir).resolve(filename).toAbsolutePath().toFile();
          FileUtils.copyInputStreamToFile(is, targetFile);
          LOG.info("File {} saved", targetFile.toPath());
        } catch (IOException e) {
          throw new RuntimeException("File download error from cache");
        }
        return true;
      });

I expect no Warn from Netty in log, but actual log is:

WARN <reactor-http-epoll-4> Failed to release a message: DefaultLastHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success) (io.netty.util.ReferenceCountUtil:115)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:94) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:120) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-codec-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:432) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:333) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]

UPD. Thanks to Thomas, problem has been solved when I've changed code like this

.onStatus(HttpStatus::is3xxRedirection, exceptionFunction)
.onStatus(HttpStatus::is4xxClientError, exceptionFunction)
.....
.....
.....
private final Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction = response -> response.bodyToMono(String.class)
    .map(body -> new RuntimeException(String.format("%s, response body: %s", response.statusCode().toString(), body)));

Solution

  • The documentation states the following:

    When onStatus is used, if the response is expected to have content, then the onStatus callback should consume it. If not, the content will be automatically drained to ensure resources are released.

    Official docs about retrieve() last section.

    What this means is that if the response, when an error occurs, is empty, then the framework will automatically close the connection for you.

    On the other hand if the response actually has some data, you must consume the response so that the connection then can be closed.

    So what you have done is right, you can log the response, or send it in the exception. or ignore it. But you need to consume the response as you have done.

    I also believe you can response.bodyToMono(Void.class) if you just want to dispose it.