Search code examples
springspring-integrationspring-integration-dsl

Multithreaded Executor channel to speed up the consumer process


I have a message producer which produces around 15 messages/second

The consumer is a spring integration project which consumes from the Message Queue and does a lot of processing. Currently it is single threaded and not able to match with the rate at which the producer are sending the messages. hence the queue depth keeps on increasing

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
                .aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {

                    boolean eonExists = g.getMessages().stream()
                            .anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
                    if (eonExists) {
                        boolean einExists = g.getMessages().stream()
                                .anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
                        if (einExists) {
                            return true;
                        }
                    }
                    return false;
                }).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();

is it possible to use executor channel to process this in a multithreaded environment and speed up the consumer process

If yes, please suggest how can i achieve - To ensure ordering of the messages I need to assign the messages of same type (based on the id of the message) to the same thread of the executor channel.

[UPDATED CODE] I have created the below executor channels

    public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
            .executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

    public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
            .executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();

Now from the main message flow I redirected to a custom router which forwards the message to Executor channel as shown below -

    @Bean
    public IntegrationFlow baseEventFlow1() {

        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
                .wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
                .filter(ingFilter, "filterMessageOnEvent").route(route()).get();
    }

    public AbstractMessageRouter router() {
        return new AbstractMessageRouter() {
            @Override
            protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
                if (message.getPayload().toString().contains("\"id\":\"RPA")) {
                    return Collections.singletonList(RPA_DEFAULT_CHANNEL);
                } else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
                    return Collections.singletonList(SKW_DEFAULT_CHANNEL);
                } else {
                    return Collections.singletonList(new NullChannel());
                }
            }

        };
    }

I will have individual consumer flow for the corresponding executor channel.

Please correct my understaning

[UPDATED]

    @Bean
    @BridgeTo("uaxDefaultChannel")
    public MessageChannel ucaDefaultChannel() {
        return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }

    @Bean
    @BridgeTo("uaDefaultChannel")
    public MessageChannel ualDefaultChannel() {
        return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
    }
    @Bean
    public IntegrationFlow uaEventFlow() {
        return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
                .transform(eventHandler, "parseEvent")
}

So BridgeTo on the executor channel will forward the messages


Solution

  • hence the queue depth keeps on increasing

    Since it looks like your queue is somewhere on JMS broker that is really OK to have such a behavior. That's exactly for what messaging systems have been designed - to distinguish producer and consumer and deal with messages in a destination whenever it is possible.

    if you want to increase a polling from JMS, you can consider to have a concurrency option on the JMS container:

    /**
     * The concurrency to use.
     * @param concurrency the concurrency.
     * @return current {@link JmsDefaultListenerContainerSpec}.
     * @see DefaultMessageListenerContainer#setConcurrency(String)
     */
    public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
        this.target.setConcurrency(concurrency);
        return this;
    }
    
    /**
     * The concurrent consumers number to use.
     * @param concurrentConsumers the concurrent consumers count.
     * @return current {@link JmsDefaultListenerContainerSpec}.
     * @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
     */
    public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
        this.target.setConcurrentConsumers(concurrentConsumers);
        return this;
    }
    
    /**
     * The max for concurrent consumers number to use.
     * @param maxConcurrentConsumers the max concurrent consumers count.
     * @return current {@link JmsDefaultListenerContainerSpec}.
     * @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
     */
    public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
        this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
        return this;
    }
    

    See more info the Docs: https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/integration.html#jms-receiving

    But that won't allow you to "asign messages to the specific thread". There is just like no way to partition in JMS.

    We can do that with Spring Integration using router according your "based on the id of the message" and particular ExecutorChannel instances configured with a singled-threaded Executor. Every ExecutorChannel is going to be its dedicated executor with only single thread. This way you will ensure an order for messages with the same partition key and you'll process them in parallel. All the ExecutorChannel can have the same subscriber or bridge to the same channel for processing.

    However you need to keep in mind that when you are leaving JMS listener thread, you finish JMS transaction and you fail to process a message in that separate thread you may lose a message.