I have 2 JMS queues and my application subscribes to both of them with Jms.messageDrivenChannelAdapter(...)
First queue receives messages of type Paid
. Second queue receives messages of type Reversal
Business scenario defines correlation between messages of type Paid
and type Reversal
should wait for Paid
in order to be processed.
How can I achieve such "wait" pattern with Spring Integration?
Is it possible to correlate messages between 2 JMS queues?
See the documentation about the Aggregator.
The aggregator correlates messages using some correlation strategy and releases the group based on some release strategy.
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete. At that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.
The output payload is a list of the grouped message payloads by default, but you can provide a custom output processor.
public class So55299268Application {
public static void main(String[] args) {
SpringApplication.run(So55299268Application.class, args);
public IntegrationFlow in1(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
public IntegrationFlow in2(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
public IntegrationFlow aggregator() {
return f -> f
.aggregate(a -> a
.releaseExpression("size() == 2")
public IntegrationFlow discards() {
return f -> f.handle((p, h) -> {
System.out.println("Aggregation timed out for " + p);
return null;
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
send(template, "one", "two");
send(template, "three", null);
private void send(JmsTemplate template, String one, String two) {
template.convertAndSend("queue1", one, m -> {
return m;
if (two != null) {
template.convertAndSend("queue2", two, m -> {
return m;
GenericMessage [payload=[two, one], headers={jms_redelivered=false, jms_destination=queue://queue1, jms_correlationId=one, id=784535fe-8861-1b22-2cfa-cc2e67763674, priority=4, jms_timestamp=1553290921442, jms_messageId=ID:Gollum2.local-55540-1553290921241-4:1:3:1:1, timestamp=1553290921457}]
2019-03-22 17:42:06.460 INFO 55396 --- [ask-scheduler-1] o.s.i.a.AggregatingMessageHandler : Expiring MessageGroup with correlationKey[three]
Aggregation timed out for three