Search code examples
spring-webfluxspring-reactor

Webflux Chaining multiple Mono


I am new to reactive programming (Spring webflux) and wanted how to best handle this use case. I have a reactive service call (getAccount) that returns Mono and I want to chain it with another service call getBooks that returns Mono<Set> and one final synchronous call transform that perform some kind of transformation and returns something like Mono<Set> How would I chain and return the transformed data in the form of Mono<Set> while logging warnings when getAccount or getBooks are returning empty? Here is a simplified version of what I am trying to do as an example.

Given some fake services here

 public static Mono<String> getAccount(String name){
        return Mono.just(name);
    }
    public static Mono<Set<Book> getBooks(String title){
        return Mono.just(Sets.newHashSet(new Book(title + "One", "Author One"),
            new Book(title +"Two", "Author Two"),
            new Book(title + "Three", "Author Three")));

    }
    public static LibraryBook transform (Book a){
        return new LibraryBook(a.getTitle(), a.getAuthorName(), "someUniqueId");
    }

I wanted to get the account of a given user and find all the books that he/she borrowed and transform the books and return the value as a Mono<Set> while logging warnings where appropriate

Here is my start

public Mono<Set<LibraryBook>> getBorrowedBooks(String userId) {
     return getAccount(userId)
            .flatMap(account ->  getBooks(account))
            .log()
            .map(books -> books.stream().map(book -> transform(book)).collect(Collectors.toSet()))
          
}

However, I am not sure if mixing reactive and stream is a bad thing and it just does not look right.


Solution

  • Update:

    Since you cant modify the getBooks method, you can construct your getBorrowedBooks method in the following way to avoid dealing with stream.

    Note- the loggers and exceptions are just for example. You can handle empty scenarios in different ways too.

    public class MyApp {
    
      private static final Logger LOGGER = LoggerFactory.getLogger(MyApp.class);
    
      public static void main(String[] args) {
    
        List<LibraryBook> libraryBooks = getBorrowedBooks("Abhi").collectList().block();
        libraryBooks.forEach(System.out::println);
      }
    
      public static Mono<String> getAccount(String name) {
        return Mono.just(name);
      }
    
      public static Mono<Set<Book>> getBooks(String title) {
        return Mono.just(Sets.newHashSet(new Book(title + "One", "Author One"),
            new Book(title + "Two", "Author Two"),
            new Book(title + "Three", "Author Three")));
    
      }
    
      public static LibraryBook transform(Book a) {
        return new LibraryBook(a.getTitle(), a.getAuthorName(), "someUniqueId");
      }
    
      public static Flux<LibraryBook> getBorrowedBooks(String userId) {
        return getAccount(userId)
            .switchIfEmpty(Mono.defer(() -> {
              LOGGER.error("No account found");
              return Mono.error(new NoAccountFoundException());
            }))
            .flatMap(account -> getBooks(account))
            .flatMapMany(Flux::fromIterable)
            .switchIfEmpty(Mono.defer(() -> {
              LOGGER.error("No books found for account");
              return Mono.error(new NoBooksFoundForGivenAccountException());
            }))
            .map(MyApp::transform);
      }
    

    Compile wise this is correct. But logically, I think this is incorrect because you are not considering the definitions of mono and flux.

    Mono is a stream of 0..1 elements. Flux is a stream which can emit 0..N elements.

    The method getBooks (as the name suggests) should emit more than 1 element (which is Book here) for a given title. So it's return type should be flux instead of Mono of a collection.

    Even you can take examples from the Spring's reactive repository methods:

    ReactiveCrudRepository

    Now the idea of removing duplicates and storing a collection in a hashset , in reactive world, is synonymous to calling distinct() on a Flux of elements.

    Flux.distinct()

    So your getBooks method should look like:

      public static Flux<Book> getBooks(String title){
    
        return Flux.just(new Book(title + "One", "Author One"),
            new Book(title +"Two", "Author Two"),
            new Book(title + "Three", "Author Three"))
            .distinct();
    
      }
    

    And your getBorrowedBooks method should look like:

      public Flux<LibraryBook> getBorrowedBooks(String userId) {
        return getAccount(userId)
            .flatMapMany(account -> getBooks(account))
            .log()
            .map(book -> transform(book));
      }