Search code examples
javareactive-programmingwebclientspring-webfluxproject-reactor

Handling exception in WebClient throws io.netty.handler.timeout.ReadTimeoutException


So I am new to Reactive programming and I wrote some code that I would like to test. Those are more of a integration tests as I am live copying files and later check if they are the same. I have a MockWebServer mocking my reponse to be 4xx which is handled well in the code. Unfortunately, I am also getting io.netty.handler.timeout.ReadTimeoutException which covers up my custom WebClientResponseException so in the test I am getting the wrong exception. Basically I have two questions, why on earth am I getting this io.netty.handler.timeout.ReadTimeoutException exception? It appears only after doOnError() method for some reason and I am not sure why is it even happening.

Right now code is at it is and it's being synchronous, I am well aware of that.

Second question is, how could I handle my custom exception in the tests after a given number of retries? Right now it is 3 and only then I would like my other exception to be thrown.

Here is the code:

AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);

Flux<DataBuffer> fileDataStream = Mono.just(filePath)
    .map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
    .map(bytes -> webClient
                  .get()
                  .uri(uri)
                  .accept(MediaType.APPLICATION_OCTET_STREAM)
                  .header("Range", String.format("bytes=%d-", bytes))
                  .retrieve()
                  .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
                  .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
                  .bodyToFlux(DataBuffer.class)
                  .doOnError(throwable -> log.info("fileDataStream  onError", throwable))
                )
    .flatMapMany(Function.identity());

return DataBufferUtils
        .write(fileDataStream, fileChannel)
        .map(DataBufferUtils::release)
        .doOnError(throwable -> {
            try {
                fileChannel.force(true);
            } catch (IOException e) {
                throw new WritingException("failed force update to file channel", e);
            }
        })
        .retry(3)
        .doOnComplete(() -> {
             try {
                 fileChannel.force(true);
             } catch (IOException e) {
                 log.warn("failed force update to file channel", e);
                 throw new WritingException("failed force update to file channel", e);
             }
        })
        .doOnError(throwable -> targetPath.toFile().delete())
        .then(Mono.just(target));

The response is Mono<Path> as I am interested only in the Path of the newly created and copied file.

Any comments regarding code are welcome.

The copying mechanism was made basing on this thread Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor


Solution

  • So basically the problem was in tests. I had a MockResponse enqueued to the MockWebServer only once so upon retrying in the WebClient mocked server did not have any response set (basically it behaved like it is not available at all as there was no mocked response).

    To be able to handle exceptions in case of the server being completely down it is in my opinion worth adding lines that goes something like this into your flux chain:

    .doOnError(ChannelException.class, e -> {
        throw new YourCustomExceptionForHandlingServerIsDownSituation("Server is unreachable", e);
    })
    

    This will help you handle ReadTimeoutException from Netty (in case of server being not reachable) as it extends ChannelException class. Always handle your exceptions.