I read from one topic with 10 consumer and send these messages to one queue. When I send 50 messages to topic by the jmeter, queue has 500 messages. So, each consumer read same messages from topic and after send to queue. Can each consumer read different messages from topic?
Thank you so much.
JmsConfig.java
@Configuration
@EnableJms
@ComponentScan(basePackages = "com.jms.config")
public class JmsConfig {
String BROKER_URL = "tcp://localhost:61616";
String BROKER_USERNAME = "admin";
String BROKER_PASSWORD = "admin";
@Bean
public ActiveMQConnectionFactory connectionFactory(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true);
return template;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-10");
factory.setPubSubDomain(true);
return factory;
}
JmsSender.java
@Service
public class JmsSender{
private JmsTemplate jmsTemplate;
@Value("#{appProperties.toQueueName}")
private String queueName;
@Autowired
private ApplicationContextUtil applicationContextUtil;
public void send(String rawData){
getJmsTemplate().send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(rawData);
}
});
}
public JmsTemplate getJmsTemplate(){
if (jmsTemplate == null){
jmsTemplate = (JmsTemplate) applicationContextUtil.getBeanFromAppContext("jmsForQueue");
}
return jmsTemplate;
}
}
Worker.java
@Component
public class Worker {
@Autowired
private JmsSender jmsSender;
@JmsListener(destination = "#{appProperties.fromTopicName}")
public String receiveMessageFromTopic(final String jsonMessage) throws JMSException {
System.out.println("Received message " + jsonMessage);
jmsSender.send(jsonMessage);
return response;
}
}
QueueConfig.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:cache="http://www.springframework.org/schema/cache"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.4.0.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.3.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
http://www.springframework.org/schema/cache
http://www.springframework.org/schema/cache/spring-cache.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"
default-lazy-init="false">
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.toQueueName}"/>
</bean>
</beans>
The behavior you're seeing is expected. A JMS topic follows publish/subscribe (i.e. pub/sub) semantics where all subscribers get all the messages sent to the topic. In your case, you have 10 subscribers and you're sending 50 messages. Each of these 10 subscribers receives each of the 50 messages (in accordance with pub/sub semantics) and then forwards it to a queue. Therefore, the queue receives 500 messages.
If you want all the consumers to share all the messages then you shouldn't use a JMS topic but rather a JMS queue.