Search code examples
spring-webfluxreactive-programmingproject-reactorspring-cloud-function

Avoiding Nested Flux<Mono<>> in a Function Method of Flux<> Input


I am giving Spring Cloud Kafka Reactive and Reactive Mongo a try. I am not experienced on Reactor. I am ending up with nested Flux<Mono<>> situation as I describe below

Here are methods and snippets:

Kafka messages are bounded to a function method by Spring Cloud Streams. Input and output are Flux

public Function<Flux<Employee>, Flux<Message<Employee>> enrich() {
    return flux-> ...
}

I have classic Spring MongoDB repository classes and query methods

public interface EmployeeRepository extends ReactiveMongoRepository<Employee, String>{ 
  Mono<Employee> findOneById(String id); 
}

I need to extract the id from incoming Kafka message from the Flux and need to call the MongoDB repository methods. After extracting id, I always end up with nested Mono from db calls. Tried multiple ways. I couldn't go beyond ending up Mono or Flux in Flux if I use map or zipwith.

public Function<Flux<Employee>, Flux<Message<Employee>> enrich(){
    return flux-> flux.map(msg -> MessageBuilder.withPayload(findByOneId(msg.getId())).build()) ....
}

Is there a way to use zipWith or another way/method that at the same time extract Id from the Flux and passing Id to the db calls so that I avoid getting Mono in a Flux?


Solution

  • flatMap allows concurrent processing of multiple elements in the Flux , using flatMap prevents nested Flux or Mono structures.

    public Function<Flux<Employee>, Flux<Message<Employee>>> enrich(EmployeeRepository employeeRepository) {
        return flux -> flux.flatMap(employee -> 
            employeeRepository.findOneById(employee.getId())
                .map(enrichedEmployee -> MessageBuilder.withPayload(enrichedEmployee).build())
        );
    }