I had a method createComplianceResponse() which was returning BOLCompliance object before which I changed to Mono of BOLCompliance as I had to call another reactive service inside it. But after doing that, my current service from which I was making the call to this service is giving errors at line 6 and 7.
1) Cannot convert from Mono<Object> to Mono<List<BOLCompliance>>
2) Cannot convert from List<Mono<BOLCompliance>> to List<BOLCompliance>
private Mono<List<BOLCompliance>> getComplienceRouteLink(BOLRouteLink routeLink, BillOfLadingResponse bol) {
if (null != routeLink.getComplianceIds() && !routeLink.getComplianceIds().isEmpty()) {
Mono<List<Compliance>> complianceList = Flux
.fromIterable(routeLink.getComplianceIds()).flatMap(cmp ->
complianceCaller.getComplianceById(cmp)).collectList();
return complianceList.flatMap(compliancesOld -> {
List<BOLCompliance> complianceResponses = compliancesOld.parallelStream()
.map(compliance -> createComplianceResponse(compliance,bol)
).collect(Collectors.toList());
return Mono.just(complianceResponses);
});
}
My other service for whom I changed return type is
private Mono<BOLCompliance> createComplianceResponse(Compliance compliance, BillOfLadingResponse bol);
Please take a look at this, which could replace your code:
private Flux<BOLCompliance> getComplienceRouteLink(BOLRouteLink routeLink, BillOfLadingResponse bol) {
return Mono.justOrEmpty(routeLink.getComplianceIds())
.flatMapMany(Flux::fromIterable)
.flatMap(id -> complianceCaller.getComplianceById(id))
.flatMap(compliance -> createComplianceResponse(compliance, bol));
}
Instead of returning a Mono<List<Something>>
you return a Flux<Something>
and can parallelize on that. It's also important to know, what call might take longer, and needs to be scheduled on another threadpool, to be parallelized. You can extend this Flux creation with scheduling on another threadpool easily.