So I am new to Reactive programming and I wrote some code that I would like to test. Those are more of a integration tests as I am live copying files and later check if they are the same. I have a MockWebServer
mocking my reponse to be 4xx
which is handled well in the code. Unfortunately, I am also getting io.netty.handler.timeout.ReadTimeoutException
which covers up my custom WebClientResponseException
so in the test I am getting the wrong exception. Basically I have two questions, why on earth am I getting this io.netty.handler.timeout.ReadTimeoutException
exception? It appears only after doOnError()
method for some reason and I am not sure why is it even happening.
Right now code is at it is and it's being synchronous, I am well aware of that.
Second question is, how could I handle my custom exception in the tests after a given number of retries? Right now it is 3 and only then I would like my other exception to be thrown.
Here is the code:
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(targetPath, StandardOpenOption.WRITE);
Flux<DataBuffer> fileDataStream = Mono.just(filePath)
.map(file -> targetPath.toFile().exists() ? targetPath.toFile().length() : 0)
.map(bytes -> webClient
.get()
.uri(uri)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.header("Range", String.format("bytes=%d-", bytes))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new CustomException("4xx error")))
.onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new CustomException("5xx error")))
.bodyToFlux(DataBuffer.class)
.doOnError(throwable -> log.info("fileDataStream onError", throwable))
)
.flatMapMany(Function.identity());
return DataBufferUtils
.write(fileDataStream, fileChannel)
.map(DataBufferUtils::release)
.doOnError(throwable -> {
try {
fileChannel.force(true);
} catch (IOException e) {
throw new WritingException("failed force update to file channel", e);
}
})
.retry(3)
.doOnComplete(() -> {
try {
fileChannel.force(true);
} catch (IOException e) {
log.warn("failed force update to file channel", e);
throw new WritingException("failed force update to file channel", e);
}
})
.doOnError(throwable -> targetPath.toFile().delete())
.then(Mono.just(target));
The response is Mono<Path>
as I am interested only in the Path
of the newly created and copied file.
Any comments regarding code are welcome.
The copying mechanism was made basing on this thread Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor
So basically the problem was in tests. I had a MockResponse
enqueued to the MockWebServer
only once so upon retrying in the WebClient
mocked server did not have any response set (basically it behaved like it is not available at all as there was no mocked response).
To be able to handle exceptions in case of the server being completely down it is in my opinion worth adding lines that goes something like this into your flux chain:
.doOnError(ChannelException.class, e -> {
throw new YourCustomExceptionForHandlingServerIsDownSituation("Server is unreachable", e);
})
This will help you handle ReadTimeoutException
from Netty (in case of server being not reachable) as it extends ChannelException
class. Always handle your exceptions.