Using Spring Boot WebFlux, I noticed that doOnNext
is executed on a Flux object
only before that object is assigned to a local variable.
In order to observe this behaviour I use an app which is created with Spring initializr using default settings. The only dependency I choose to add into POM is "Spring Reactive Web".
@Controller
public class ProblemController {
@PostMapping("/upload")
public Mono<ResponseEntity<String>> upload(ServerWebExchange exchange) {
Flux<DataBuffer> body = exchange.getRequest().getBody()
// This prints!
.doOnNext(db -> System.out.println("Bytes A: " + db.readableByteCount()));
// Does not print anything...
body.doOnNext(db -> System.out.println("Bytes B: " + db.readableByteCount()));
return body
.next()
.map(db -> String.format("Uploaded %d bytes", db.readableByteCount()))
.map(s -> new ResponseEntity<String>(s, HttpStatus.ACCEPTED));
}
}
When I upload a file with
curl --location --request POST 'http://localhost:8080/upload' \
--form 'file=@"/path/to/file.txt"'
I can see that only Bytes A
message gets printed.
Even I add more doOnNext
callbacks they all seem to be executed,
Flux<DataBuffer> body = exchange.getRequest().getBody()
// This prints!
.doOnNext(db -> System.out.println("Bytes A: " + db.readableByteCount()))
.doOnNext(db -> System.out.println("Bytes X: " + db.readableByteCount()));
Prints both Bytes A
and Bytes X
.
It never prints Bytes C
though. Which gives me an impression that something in Flux object changes when it gets assigned to a local variable (Flux<DataBuffer> body
).
What is really happening in this example?
And why I cannot see Bytes C
printed?
Each operator in Flux or Mono APIs returns a new instance. When you are chaining calls, you ultimately build upon each intermediate instance and get the "outer" one.
With your Bytes C
example you've simply called the method without assigning the resulting new instance to any variable, so the instance has been garbage collected and was never made part of the execution path.
https://projectreactor.io/docs/core/release/reference/#faq.chain