Search code examples
javaspringspring-bootspring-integrationenterprise-integration

is there any out of the box spring ingetration pattern for consuming message from dynamic queue and process them?


I am trying to achieve a scenario using spring integration where it has to process redis queue(s) generated on the fly. The examples I have found so far on internet works on predefined queues.

In my situation, there are more than 100 redis queues are generated on the fly by an application, and my code will consume messages from those queues. I have managed to create a POC kind of project(github link) and it is working.

I would like to know if there is a better way to achieve the same. As far as I have seen, enterprise integration pattern does not say anything about consuming message from multiple dynamic queues or message sources, is there any out of the box solution other than customizing or altering existing framework source code?

I prefer to use Spring Integration Java configuration and DSL over xml.


Solution

  • See Dynamic and Runtime Integration Flows.

    To simplify the development experience, Spring Integration introduced IntegrationFlowContext to register and manage IntegrationFlow instances at runtime, as the following example shows:

    @Autowired
    private AbstractServerConnectionFactory server1;
    
    @Autowired
    private IntegrationFlowContext flowContext;
    
    ...
    
    @Test
    public void testTcpGateways() {
        TestingUtilities.waitListening(this.server1, null);
    
        IntegrationFlow flow = f -> f
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                        .serializer(TcpCodecs.crlf())
                        .deserializer(TcpCodecs.lengthHeader1())
                        .id("client1"))
                    .remoteTimeout(m -> 5000))
                .transform(Transformers.objectToString());
    
        IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
        assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
    }
    

    This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based and we must create it on the fly. ...

    EDIT

    @SpringBootApplication
    public class So59117728Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So59117728Application.class, args).close();
        }
    
        @Bean
        public ApplicationRunner runner(RedisConnectionFactory cf, IntegrationFlowContext context,
                RedisTemplate<String, String> template) {
    
            return args -> {
                IntegrationFlow flow = IntegrationFlows
                        .from(redisEndpoint("So59117728Application", cf))
                        .handle(System.out::println)
                        .get();
                context.registration(flow).id("myDynamicFlow").register();
                template.boundListOps("So59117728Application").leftPush("foo");
    
                Thread.sleep(10_000);
                context.remove("myDynamicFlow");
            };
        }
    
        private RedisQueueMessageDrivenEndpoint redisEndpoint(String queueName, RedisConnectionFactory cf) {
            RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(queueName, cf);
            endpoint.setSerializer(new StringRedisSerializer());
            return endpoint;
        }
    
    }