Search code examples
springspring-bootspring-integrationproject-reactor

How to see the types that flows in Spring Integration's IntegrationFlow


I try to understand what's the type that returns when I aggregate in Spring Integration and that's pretty hard. I'm using Project Reactor and my code snippet is:

public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
    FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
    f.setWindowTimespan(Duration.ofSeconds(5));
    f.setCombineFunction(messageFlux -> messageFlux
        .map(Message::getPayload)
        .collectList()
        .map(GenericMessage::new);
    return f;
}

@Bean
public IntegrationFlow dataPipeline() {
   return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
      .handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
      .handle(bla())
      .get();
}

More than understanding the types that passes in the example, I want to know in general how can I know what are the objects that flows in the IntegrationFlow and their types.


Solution

  • IntegrationFlows.from(somePublisher)
    

    This creates a FluxMessageChannel internally which subscribes to the provided Publsiher. Every single event is emitted from this channel to its subscriber - your aggregator.

    The FluxAggregatorMessageHandler produces whatever is explained in the setCombineFunction() JavaDocs:

    /**
     * Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
     * Requires a {@link Mono} result with a {@link Message} as value as a combination result
     * of the incoming {@link Flux} for window.
     * By default a {@link Flux} for window is fully wrapped into a message with headers copied
     * from the first message in window. Such a {@link Flux} in the payload has to be subscribed
     * and consumed downstream.
     * @param combineFunction the {@link Function} to use for result windows transformation.
     */
    public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
    

    So, it is a Mono with a message which you really do with your .collectList(). That Mono is subscribed by the framework when it emits a reply message from the FluxAggregatorMessageHandler. Therefore your .handle(bla()) must expect a list of payloads. Which is really natural for the aggregator result.

    See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator