My message handler returns a flux and ideally, I want spring integration to handle blocking/subscribing to the flux so that my second handler below gets the resolved object. But when the following code runs the output is: FluxFlatMap
public IntegrationFlow flow1() {
Function<Map,Flux<Account>> handler = (map)->service.getAccounts(map)
.flatMap(account -> orderService.getOrdersForAcct(account).take(3));
return IntegrationFlow.from("accountMessageChannel")
.handle((payload,headers)-> handler.apply(payload))
.handle((payload,headers)->{"got something");;return null;})
I have tried passing off the Flux to a FluxMessageChannel but the behavior was the same.
Was fixed recently in the current development branch:
So, to make it working and resolving your Flux
reply, you need to do this with the version you are using:
.handle((payload,headers)-> handler.apply(payload), e -> e.async(true))