Search code examples
spring-bootspring-integrationdslspring-integration-dsl

How to periodically publish a message to activemq spring integration DSL


I installed Active MQ and periodically say every 10 seconds want to send a message to "my.queue"

I'm struggling to comprehend Spring Integration DSL language.

I need something like

IntegrationFlows.from(every 5 seconds)
 .send(message to "my.queue")

Solution

  • Yes, you can do that with Spring Integration Java DSL and its IntegrationFlow abstraction. To make a periodic task you need to use this factory in the IntegrationFlows to start the flow:

    /**
     * Provides {@link Supplier} as source of messages to the integration flow.
     * which will be triggered by a <b>provided</b>
     * {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter}.
     * @param messageSource the {@link Supplier} to populate.
     * @param endpointConfigurer the {@link Consumer} to provide more options for the
     * {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
     * @param <T> the supplier type.
     * @return new {@link IntegrationFlowBuilder}.
     * @see Supplier
     */
    public static <T> IntegrationFlowBuilder fromSupplier(Supplier<T> messageSource,
            Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {
    

    The Supplier may return an object you'd like to send as a payload downstream. The second consumer arg can be configured with the:

    .poller(p -> p.fixedDelay(1000))
    

    This way every second a message is going to be created from the supplied payload and sent downstream.

    To send a message ti Active MQ, you need to use a org.springframework.integration.jms.dsl.Jms and its method for respective channel adapter:

    /**
     * The factory to produce a {@link JmsOutboundChannelAdapterSpec}.
     * @param connectionFactory the JMS ConnectionFactory to build on
     * @return the {@link JmsOutboundChannelAdapterSpec} instance
     */
    public static JmsOutboundChannelAdapterSpec.JmsOutboundChannelSpecTemplateAware outboundAdapter(
            ConnectionFactory connectionFactory) {
    

    The result of this factory has to be used in the DSL callback like:

    /**
     * Populate a {@link ServiceActivatingHandler} for the provided
     * {@link MessageHandler} implementation.
     * Can be used as Java 8 Lambda expression:
     * <pre class="code">
     * {@code
     *  .handle(m -> logger.info(m.getPayload())
     * }
     * </pre>
     * @param messageHandler the {@link MessageHandler} to use.
     * @return the current {@link BaseIntegrationFlowDefinition}.
     */
    public B handle(MessageHandler messageHandler) {
    

    All the info is present in the docs: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl

    Something like this:

    @Bean
    public IntegrationFlow jmsPeriodicFlow() {
        return IntegrationFlows.fromSupplier(() -> "hello", 
                                   e -> e.poller(p -> p.fixedDelay(5000)))
                .handle(Jms.outboundAdapter(jmsConnectionFactory())
                        .destination("my.queue"))
                .get();
    }