Search code examples
spring-integrationspring-integration-dsl

Spring Integration Flow - how to call a service and receive response on separate channel


I am trying to build a Spring Integration Flow with DSL where a part of the flow calls an existing service that will process data asynchronously and return the response on a channel. The service call returns a task ID that can be used as a correlation ID to obtain the correct response message on the channel.

I am unsure how to build a flow (which components to use) that will call the service (I assume with a service activator), then take the returned ID and then wait for a message on a different channel that has that ID in the correlation ID header (maybe some sort of aggregator?). I have googled and cannot find anything that seems similar.

Also, my flow will receive a request object that I would like to pair up with the response object to pass along the flow after the response is received.

Request -> service call -> returns task ID ---->|
                |                               |---- (Request+Response) --> More processing
                | (async)                       |
                ---------> Response ----------->|
                           for task ID 
                           on task complete channel

Solution

  • You are correct. The best way to solve your task is really an aggregator pattern:

    https://www.enterpriseintegrationpatterns.com/Aggregator.html

    https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

    So, you probably need to use a header enricher to instead of a service activator to obtain that task ID in the reply and store it in a header for future correlation of this request and some reply later with the same task ID. Or if you have a property on the request object for this task ID, you can use a content enricher instead: https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#content-enricher

    Then you send this request object with the task ID to an aggregator where this task ID must be used as a correlation key. The group size of course is just 2 - request and reply.

    Your async service must send a completion to the same aggregator's input channel with. When aggregator encounters a proper correlation key, it will complete the group of two messages and send a single one to its output channel.

    UPDATE

    The aggregator we are talking about must have its own input channel, so you can send a request with task ID for correlation and then from your async service a reply must be sent to the same channel. With Java DSL it is a matter of exposing that input channel for your convenience:

    @Bean
    IntegrationFlow aggregatorFlow() {
        return f -> f
                  .aggregate(...)
                  .channel("correlatedReplyChannel")
    }
    

    This flow implicitly starts with a channel like aggregatorFlow.input. So, you use this name in a channel() definition of request and reply flows as the last EIP-method in their definitions.