Search code examples
jmsspring-integrationspring-integration-dsl

Correlate messages between 2 JMS queues using Spring integration components


I have 2 JMS queues and my application subscribes to both of them with Jms.messageDrivenChannelAdapter(...) component.

First queue receives messages of type Paid. Second queue receives messages of type Reversal.

Business scenario defines correlation between messages of type Paid and type Reversal.

Reversal should wait for Paid in order to be processed.

How can I achieve such "wait" pattern with Spring Integration?

Is it possible to correlate messages between 2 JMS queues?


Solution

  • See the documentation about the Aggregator.

    The aggregator correlates messages using some correlation strategy and releases the group based on some release strategy.

    The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete. At that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.

    The output payload is a list of the grouped message payloads by default, but you can provide a custom output processor.

    EDIT

    @SpringBootApplication
    public class So55299268Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So55299268Application.class, args);
        }
    
        @Bean
        public IntegrationFlow in1(ConnectionFactory connectionFactory) {
            return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination("queue1"))
                    .channel("aggregator.input")
                    .get();
        }
    
        @Bean
        public IntegrationFlow in2(ConnectionFactory connectionFactory) {
            return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory)
                        .destination("queue2"))
                    .channel("aggregator.input")
                    .get();
        }
    
        @Bean
        public IntegrationFlow aggregator() {
            return f -> f
                    .aggregate(a -> a
                            .correlationExpression("headers.jms_correlationId")
                            .releaseExpression("size() == 2")
                            .expireGroupsUponCompletion(true)
                            .expireGroupsUponTimeout(true)
                            .groupTimeout(5_000L)
                            .discardChannel("discards.input"))
                    .handle(System.out::println);
        }
    
        @Bean
        public IntegrationFlow discards() {
            return f -> f.handle((p, h) -> {
                System.out.println("Aggregation timed out for " + p);
                return null;
            });
        }
    
        @Bean
        public ApplicationRunner runner(JmsTemplate template) {
            return args -> {
                send(template, "one", "two");
                send(template, "three", null);
            };
        }
    
        private void send(JmsTemplate template, String one, String two) {
            template.convertAndSend("queue1", one, m -> {
                m.setJMSCorrelationID(one);
                return m;
            });
            if (two != null) {
                template.convertAndSend("queue2", two, m -> {
                    m.setJMSCorrelationID(one);
                    return m;
                });
            }
        }
    
    }
    

    and

    GenericMessage [payload=[two, one], headers={jms_redelivered=false, jms_destination=queue://queue1, jms_correlationId=one, id=784535fe-8861-1b22-2cfa-cc2e67763674, priority=4, jms_timestamp=1553290921442, jms_messageId=ID:Gollum2.local-55540-1553290921241-4:1:3:1:1, timestamp=1553290921457}]

    2019-03-22 17:42:06.460 INFO 55396 --- [ask-scheduler-1] o.s.i.a.AggregatingMessageHandler : Expiring MessageGroup with correlationKey[three]

    Aggregation timed out for three