My message handler returns a flux and ideally, I want spring integration to handle blocking/subscribing to the flux so that my second handler below gets the resolved object. But when the following code runs the output is: FluxFlatMap
@Bean
public IntegrationFlow flow1() {
Function<Map,Flux<Account>> handler = (map)->service.getAccounts(map)
.take(20)
.log()
.flatMap(account -> orderService.getOrdersForAcct(account).take(3));
return IntegrationFlow.from("accountMessageChannel")
.transform(Transformers.fromJson(Map.class))
.handle((payload,headers)-> handler.apply(payload))
.handle((payload,headers)->{log.info("got something");log.info(payload.toString());return null;})
.get();
}
I have tried passing off the Flux to a FluxMessageChannel but the behavior was the same.
Was fixed recently in the current development branch: https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-6.0-to-6.1-Migration-Guide#endpoint-is-async-by-default-for-output-channel-as-fluxmessagechannel.
So, to make it working and resolving your Flux
reply, you need to do this with the version you are using:
.handle((payload,headers)-> handler.apply(payload), e -> e.async(true))