Search code examples
spring-integrationspring-integration-dsl

Spring Integration Load Balance to JMS Queues


I would like to take JMS msgs from a single input queue and fan them out onto N output queues.

I have a simple flow that will forward messages to a single destination but can not figure out how to apply LoadBalancer to allow for multiple destinations in round-robin fashion.

Any ideas how to do this?

@Configuration
public class TestLoadBalance {

    public static final String INPUT_QUEUE = "_dev.lb.input";
    public static final String OUTPUT_QUEUE_PREFIX = "_dev.lb.output-";


    @Bean
    public IntegrationFlow testLoadBalanceFlow(
            ConnectionFactory jmsConnectionFactory) {

        IntegrationFlow flow =  IntegrationFlows.from(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
                        .destination(INPUT_QUEUE)
        )
                .handle(buildOutput(jmsConnectionFactory, 1))
                // cant have 2nd handle. gets warn & flow end:
                // The 'currentComponent' (org.springframework.integration.jms.JmsSendingMessageHandler@516462cc) 
                // is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'
                //.handle(buildOutput(jmsConnectionFactory, 2))
                .get();
        return flow;
    }


    private JmsSendingMessageHandler buildOutput(ConnectionFactory jmsConnectionFactory, int i){
        return Jms.outboundAdapter(jmsConnectionFactory)
                .destination(OUTPUT_QUEUE_PREFIX + i).get();
    }
}

Solution

  • There are a couple of ways to do it; you can either have multiple subscribers on the channel...

    @Bean
    public IntegrationFlow inbound(ConnectionFactory cf) {
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cf)
                    .destination("foo"))
                .channel(roundRobin())
                .get();
    }
    
    @Bean
    public DirectChannel roundRobin() {
        return new DirectChannel();
    }
    
    @Bean
    public IntegrationFlow outbound1(ConnectionFactory cf) {
        return IntegrationFlows.from(roundRobin())
                .bridge() // otherwise log() will wire tap the roundRobin channel
                .log()
                .log(new LiteralExpression("Sending to bar"))
                .handle(Jms.outboundAdapter(cf)
                        .destination("bar"))
                .get();
    }
    
    @Bean
    public IntegrationFlow outbound2(ConnectionFactory cf) {
        return IntegrationFlows.from(roundRobin())
                .bridge() // otherwise log() will wire tap the roundRobin channel
                .log()
                .log(new LiteralExpression("Sending to baz"))
                .handle(Jms.outboundAdapter(cf)
                        .destination("baz"))
                .get();
    }
    

    Or, you can use a destination expression:

    @Bean
    public AtomicInteger toggle() {
        return new AtomicInteger();
    }
    
    @Bean
    public IntegrationFlow inbound(ConnectionFactory cf) {
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(cf)
                        .destination("foo"))
                .handle(Jms.outboundAdapter(cf)
                        .destinationExpression("@toggle.getAndIncrement() % 2 == 0 ? 'bar' : 'baz'"))
                .get();
    }
    
    @JmsListener(destination = "bar")
    public void bar(String in) {
        System.out.println("received " + in + " from bar");
    }
    
    @JmsListener(destination = "baz")
    public void baz(String in) {
        System.out.println("received " + in + " from baz");
    }
    

    Result:

    received test1 from bar
    received test2 from baz