I'm new to Spring Reactor and WebFlux and a little confused regarding the event flow within Spring functional web.
Example: I have a handler function returning a Mono<ServerResponse>
. Within it, a findAll()
repository method is executed returning a Flux<T>
. In compliance to the reactive manifesto, in order to be async, non-blocking and allow backpressure I would like to see an onNext()
for every element returned from the repository. However, looking at the server logs during request processing I see only one onNext()
event, which makes sense as my return type is a Mono
containing the response:
Router Function
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
Handler Function
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
Events log
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
In contrast, implementing a classic Spring annotated controller method with a Flux<T>
as return type, I'll see an onNext()
for every instance of T
(i.e. every item of the result set), which looks more "correct" to me (client now has control over event flow etc.):
Controller
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
Log
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
This is confusing. Let me elaborate:
Mono<ServerResponse>
seems evil in the sense that it encapsulates the entire result set in a single event, which for me feels like breaking the reactive priciples of asynchronous, non-blocking, backpressure-enabled event flow. Doesn't this take the control away from the client? This looks like traditional, blocking, client/server communications to me.Flux<T>
directly feels a lot nicer because it enables per-result event handling and backpressure control.My questions are:
Mono<ServerResponse>
? Will this cause a blocking, synchrous interaction, emitting the onNext()
only when all items have been read from the repo? Will I lose backpressure functionality etc.?onNext()
for every item in the result set?Mono<ServerResponse>
doesn't break these reactive principles.I might be completely wrong or missing something important. Thanks for your help!
It all depends on the client consuming the ServerResponse
. According to the WebFlux docs (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux) setting up handler functions to return Mono<ServerResponse>
regardless of the number of returned items is the standard way and absolutely fine - as long as the client correctly handles the underlying Flux<T>
all is well. My problem arose because I tested the the endpoints using curl
, which isn't able to detect the underlying Flux
. Using a functional-style enabled client (like org.springframework.web.reactive.function.client.WebClient
), the Mono<ServerResponse>
can be de-serialized into a Flux<T>
first, enabling all the nice reactive functionality, and making our onNext()
events show up.
Client code
Calling the backend like so, de-serializing the ServerResponse into a Flux:
@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
return webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
.retrieve()
.bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
.log("GET all items from server");
}
Will result in seeing all onNext()
events, enabling client-side event handling:
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : request(unbounded)
2020-05-10 16:10:10.511 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onComplete()
So all is well and fully reactive as long as proper client handling of the response takes place.