Search code examples
spring-bootspring-integrationspring-webfluxproject-reactorspring-integration-dsl

Handling a Spring Integration MessageHandler that returns a Flux


My message handler returns a flux and ideally, I want spring integration to handle blocking/subscribing to the flux so that my second handler below gets the resolved object. But when the following code runs the output is: FluxFlatMap

@Bean
public IntegrationFlow flow1() {
      Function<Map,Flux<Account>> handler = (map)->service.getAccounts(map)
                .take(20)
                .log()
                .flatMap(account -> orderService.getOrdersForAcct(account).take(3));

      return IntegrationFlow.from("accountMessageChannel")
                .transform(Transformers.fromJson(Map.class))
                .handle((payload,headers)-> handler.apply(payload))
                .handle((payload,headers)->{log.info("got something");log.info(payload.toString());return null;})
                .get();
}

I have tried passing off the Flux to a FluxMessageChannel but the behavior was the same.


Solution

  • Was fixed recently in the current development branch: https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-6.0-to-6.1-Migration-Guide#endpoint-is-async-by-default-for-output-channel-as-fluxmessagechannel.

    So, to make it working and resolving your Flux reply, you need to do this with the version you are using:

    .handle((payload,headers)-> handler.apply(payload), e -> e.async(true))