I have a message producer which produces around 15 messages/second
The consumer is a spring integration project which consumes from the Message Queue and does a lot of processing. Currently it is single threaded and not able to match with the rate at which the producer are sending the messages. hence the queue depth keeps on increasing
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").transform(eventHandler, "parseEvent")
.aggregate(a -> a.correlationStrategy(corStrgy, "getCorrelationKey").releaseStrategy(g -> {
boolean eonExists = g.getMessages().stream()
.anyMatch(eon -> ((FlightModel) eon.getPayload()).getEstGmtOnDtm() != null);
if (eonExists) {
boolean einExists = g.getMessages().stream()
.anyMatch(ein -> ((FlightModel) ein.getPayload()).getEstGmtInDtm() != null);
if (einExists) {
return true;
}
}
return false;
}).messageStore(this.messageStore)).channel("AggregatorEventChannel").get();
is it possible to use executor channel to process this in a multithreaded environment and speed up the consumer process
If yes, please suggest how can i achieve - To ensure ordering of the messages I need to assign the messages of same type (based on the id of the message) to the same thread of the executor channel.
[UPDATED CODE] I have created the below executor channels
public static final MessageChannel SKW_DEFAULT_CHANNEL = MessageChannels
.executor(ASQ_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
public static final MessageChannel RPA_DEFAULT_CHANNEL = MessageChannels
.executor(ASH_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
Now from the main message flow I redirected to a custom router which forwards the message to Executor channel as shown below -
@Bean
public IntegrationFlow baseEventFlow1() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(Jms.container(this.emsConnectionFactory, this.emsQueue).get()))
.wireTap(FLTAWARE_WIRE_TAP_CHNL)// push raw fa data
.filter(ingFilter, "filterMessageOnEvent").route(route()).get();
}
public AbstractMessageRouter router() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
if (message.getPayload().toString().contains("\"id\":\"RPA")) {
return Collections.singletonList(RPA_DEFAULT_CHANNEL);
} else if (message.getPayload().toString().contains("\"id\":\"SKW")) {
return Collections.singletonList(SKW_DEFAULT_CHANNEL);
} else {
return Collections.singletonList(new NullChannel());
}
}
};
}
I will have individual consumer flow for the corresponding executor channel.
Please correct my understaning
[UPDATED]
@Bean
@BridgeTo("uaxDefaultChannel")
public MessageChannel ucaDefaultChannel() {
return MessageChannels.executor(UCA_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
@BridgeTo("uaDefaultChannel")
public MessageChannel ualDefaultChannel() {
return MessageChannels.executor(UAL_DEFAULT_CHANNEL_NAME, Executors.newFixedThreadPool(1)).get();
}
@Bean
public IntegrationFlow uaEventFlow() {
return IntegrationFlows.from("uaDefaultChannel").wireTap(UA_WIRE_TAP_CHNL)
.transform(eventHandler, "parseEvent")
}
So BridgeTo on the executor channel will forward the messages
hence the queue depth keeps on increasing
Since it looks like your queue is somewhere on JMS broker that is really OK to have such a behavior. That's exactly for what messaging systems have been designed - to distinguish producer and consumer and deal with messages in a destination whenever it is possible.
if you want to increase a polling from JMS, you can consider to have a concurrency
option on the JMS container:
/**
* The concurrency to use.
* @param concurrency the concurrency.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrency(String)
*/
public JmsDefaultListenerContainerSpec concurrency(String concurrency) {
this.target.setConcurrency(concurrency);
return this;
}
/**
* The concurrent consumers number to use.
* @param concurrentConsumers the concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec concurrentConsumers(int concurrentConsumers) {
this.target.setConcurrentConsumers(concurrentConsumers);
return this;
}
/**
* The max for concurrent consumers number to use.
* @param maxConcurrentConsumers the max concurrent consumers count.
* @return current {@link JmsDefaultListenerContainerSpec}.
* @see DefaultMessageListenerContainer#setMaxConcurrentConsumers(int)
*/
public JmsDefaultListenerContainerSpec maxConcurrentConsumers(int maxConcurrentConsumers) {
this.target.setMaxConcurrentConsumers(maxConcurrentConsumers);
return this;
}
See more info the Docs: https://docs.spring.io/spring/docs/5.2.3.RELEASE/spring-framework-reference/integration.html#jms-receiving
But that won't allow you to "asign messages to the specific thread". There is just like no way to partition in JMS.
We can do that with Spring Integration using router
according your "based on the id of the message" and particular ExecutorChannel
instances configured with a singled-threaded Executor
. Every ExecutorChannel
is going to be its dedicated executor with only single thread. This way you will ensure an order for messages with the same partition key and you'll process them in parallel. All the ExecutorChannel
can have the same subscriber or bridge
to the same channel for processing.
However you need to keep in mind that when you are leaving JMS listener thread, you finish JMS transaction and you fail to process a message in that separate thread you may lose a message.