I am trying to build a Spring Integration Flow with DSL where a part of the flow calls an existing service that will process data asynchronously and return the response on a channel. The service call returns a task ID that can be used as a correlation ID to obtain the correct response message on the channel.
I am unsure how to build a flow (which components to use) that will call the service (I assume with a service activator), then take the returned ID and then wait for a message on a different channel that has that ID in the correlation ID header (maybe some sort of aggregator?). I have googled and cannot find anything that seems similar.
Also, my flow will receive a request object that I would like to pair up with the response object to pass along the flow after the response is received.
Request -> service call -> returns task ID ---->|
| |---- (Request+Response) --> More processing
| (async) |
---------> Response ----------->|
for task ID
on task complete channel
You are correct. The best way to solve your task is really an aggregator pattern:
https://www.enterpriseintegrationpatterns.com/Aggregator.html
So, you probably need to use a header enricher to instead of a service activator to obtain that task ID
in the reply and store it in a header for future correlation of this request and some reply later with the same task ID
. Or if you have a property on the request object for this task ID
, you can use a content enricher instead: https://docs.spring.io/spring-integration/docs/current/reference/html/message-transformation.html#content-enricher
Then you send this request object with the task ID
to an aggregator where this task ID
must be used as a correlation key. The group size of course is just 2
- request and reply.
Your async service must send a completion to the same aggregator's input channel with. When aggregator encounters a proper correlation key, it will complete the group of two messages and send a single one to its output channel.
UPDATE
The aggregator we are talking about must have its own input channel, so you can send a request with task ID
for correlation and then from your async service a reply must be sent to the same channel. With Java DSL it is a matter of exposing that input channel for your convenience:
@Bean
IntegrationFlow aggregatorFlow() {
return f -> f
.aggregate(...)
.channel("correlatedReplyChannel")
}
This flow implicitly starts with a channel like aggregatorFlow.input
. So, you use this name in a channel()
definition of request and reply flows as the last EIP-method in their definitions.