Search code examples
spring-bootreactive-programmingspring-webfluxflux

Spring Reactive Programming with Webflux - multiple operations as a non-blocking stream


I have the following code:

public Flux<Offer> getAllFilteredOffers(Map<String, String> searchParams) {

    Flux<ProductProperties> productProperties = productPropertiesService.findProductPropertiesBySearchCriteria(searchParams);
    Flux<Product> products = productService.findProductsByPropertyId(productProperties);
    Flux<Product> productsByAvailability = productService.getAllProductsByAvailability(products, searchParams);
    Flux<Offer> offers = offerRepository.findByPropertiesIds(productsByAvailability);
    return offers;

This method:

productService.getAllProductsByAvailability(products, searchParams);

looks like:

public Flux<Product> getAllProductsByAvailability(Flux<Product> products,
            Map<String, String> searchParams) {

How to pass List<Product> to getAllProductsByAvailability to keep non-blocking operations? I've read that map is blocking and should be avoided. Maybe something like that?

    Flux
                    .just(productPropertiesService.findProductPropertiesBySearchCriteria(searchParams))
                    .flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
                    .flatMap(products -> productService.getAllProductsByAvailability(Flux.create(products)?????????, searchParams))
???

I'm not expert in Webflux, currently I'm trying to figure out how to handle problems like: I have Flux but in a second step I need to pull some data from the previous Flex<> object - keeping non-blocking stream.

Than you!


Solution

  • I don't know where you read about map, but if you look at the official documenation Webflux map operator there is nothing about blocking, it just uses synchronous function to each item.

    Use this code:

    productPropertiesService.findProductPropertiesBySearchCriteria(searchParams)
                    .flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
                    .collectList()   (1)
                    .flatMapMany(products -> productService.getAllProductsByAvailability(Flux.fromIterable(products), searchParams)) (2)
    

    1) collect all elements to List and convert to Mono>

    2) create FLux from List and provide it as a parameter, flatMapMany transform Mono to Flux