Search code examples
spring-bootspring-webfluxspring-reactivespring-reactor

Will returning a Mono<ServerResponse> result in (evil) synchronous, blocking client/server comms?


I'm new to Spring Reactor and WebFlux and a little confused regarding the event flow within Spring functional web. Example: I have a handler function returning a Mono<ServerResponse>. Within it, a findAll() repository method is executed returning a Flux<T>. In compliance to the reactive manifesto, in order to be async, non-blocking and allow backpressure I would like to see an onNext() for every element returned from the repository. However, looking at the server logs during request processing I see only one onNext() event, which makes sense as my return type is a Mono containing the response:

Router Function

@Bean
 public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
     return RouterFunctions
             .route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
                     , itemsHandler::getAll);
}

Handler Function

Mono<ServerResponse> getAll(ServerRequest request) {
    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(itemRepository.findAll(), Item.class)
            .log("GET items");
}

Events log

2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | request(unbounded)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onComplete()

In contrast, implementing a classic Spring annotated controller method with a Flux<T> as return type, I'll see an onNext() for every instance of T (i.e. every item of the result set), which looks more "correct" to me (client now has control over event flow etc.):

Controller

@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
    return itemRepository
            .findAll()
            .log("GET items");
}

Log

2020-05-10 15:14:04.135  INFO 19096 --- [ctor-http-nio-5] GET items                                : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136  INFO 19096 --- [ctor-http-nio-5] GET items                                : request(unbounded)
2020-05-10 15:14:04.137  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onComplete()

This is confusing. Let me elaborate:

  • Using Mono<ServerResponse> seems evil in the sense that it encapsulates the entire result set in a single event, which for me feels like breaking the reactive priciples of asynchronous, non-blocking, backpressure-enabled event flow. Doesn't this take the control away from the client? This looks like traditional, blocking, client/server communications to me.
  • Returning the Flux<T> directly feels a lot nicer because it enables per-result event handling and backpressure control.

My questions are:

  • What are the implications of creating a Mono<ServerResponse>? Will this cause a blocking, synchrous interaction, emitting the onNext() only when all items have been read from the repo? Will I lose backpressure functionality etc.?
  • How can I get the functional style backend to send an onNext() for every item in the result set?
  • What would be the best practice in terms of the return type of a functional style handler function that is fully reactive, i.e. non-blocking, asynchronous, and backpressure compatible? I'm unsure whether Mono<ServerResponse> doesn't break these reactive principles.

I might be completely wrong or missing something important. Thanks for your help!


Solution

  • It all depends on the client consuming the ServerResponse. According to the WebFlux docs (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux) setting up handler functions to return Mono<ServerResponse> regardless of the number of returned items is the standard way and absolutely fine - as long as the client correctly handles the underlying Flux<T> all is well. My problem arose because I tested the the endpoints using curl, which isn't able to detect the underlying Flux. Using a functional-style enabled client (like org.springframework.web.reactive.function.client.WebClient), the Mono<ServerResponse> can be de-serialized into a Flux<T> first, enabling all the nice reactive functionality, and making our onNext() events show up.

    Client code

    Calling the backend like so, de-serializing the ServerResponse into a Flux:

    @GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
    public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
        return  webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
                .retrieve()
                .bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
                .log("GET all items from server");
    }
    

    Will result in seeing all onNext() events, enabling client-side event handling:

    2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
    2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : request(unbounded)
    2020-05-10 16:10:10.511  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
    2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
    2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
    2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
    2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
    2020-05-10 16:10:10.513  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onComplete()
    

    So all is well and fully reactive as long as proper client handling of the response takes place.