Search code examples
javareactive-programmingproject-reactor

Subscribe / Iterate a List of Mono


I have a list of Rules that return Mono<Result>, I need to execute them and return another List of Mono with the result of each function.

Mono<List< Rule>> to Mono<List< RuleResult>>

I have this, which works but blocking the execution seems not correct to me:

List<Rule> RuleSet
...
Mono<List<RuleResult>> result= Mono.just(RuleSet.stream().map(rule -> rule.assess(object).block()).collect(Collectors.toList()));

How can I convert this into less blocking version?

I tried the following:

//Create a List of Monos
List<Mono<RuleResult>> ruleStream=statefulRuleSet.stream().map(rule -> rule.assess(assessmentObject)).collect(Collectors.toList());

//Cannot convert type
Flux.zip(ruleStream,...)).collectList();

//Not sure how to do this
Flux.fromIterable(ruleStream...).collectList();

Perhaps I am thinking of a wrong solution overall, somebody has any pointers?


Solution

  • For example:

    interface Rule extends Function<Object, Mono<Object>> { }
    
    public static void main(String[] args) {
    
      Rule rule1 = (o) -> Mono.just(Integer.valueOf(o.hashCode()));
      Rule rule2 = (o) -> Mono.just(o.toString());
      List<Rule> rules = List.of(rule1, rule2);
    
      Object object = new Object();
    
      Mono<List<Object>> result = Flux.fromIterable(rules)
        .flatMapSequential(rule -> rule.apply(object)).collectList();
    
      result.subscribe(System.out::println);
    }
    

    Using flatMapSequential allows you to wait for up to maxConcurrency results at the same time. The maxConcurrency value can be specified as an additional argument to flatMapSequential. In reactor-core 3.3.8 its default value is 256.