Search code examples
javaspring-mvcjmsactivemq-classicjms-topic

How to read messages from topic with multiple consumer?


I read from one topic with 10 consumer and send these messages to one queue. When I send 50 messages to topic by the jmeter, queue has 500 messages. So, each consumer read same messages from topic and after send to queue. Can each consumer read different messages from topic?

Thank you so much.

JmsConfig.java

@Configuration
@EnableJms
@ComponentScan(basePackages = "com.jms.config")
public class JmsConfig {
    String BROKER_URL = "tcp://localhost:61616";
    String BROKER_USERNAME = "admin";
    String BROKER_PASSWORD = "admin";

    @Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(BROKER_URL);
        connectionFactory.setPassword(BROKER_USERNAME);
        connectionFactory.setUserName(BROKER_PASSWORD);
        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory());
        template.setPubSubDomain(true);
        return template;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-10");
        factory.setPubSubDomain(true);
        return factory;
    }

JmsSender.java

@Service
public class JmsSender{

    private JmsTemplate jmsTemplate;

    @Value("#{appProperties.toQueueName}")
    private String queueName;

    @Autowired
    private ApplicationContextUtil applicationContextUtil;

    public void send(String rawData){
        getJmsTemplate().send(queueName, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                 return session.createObjectMessage(rawData);
            }
        });
    }
    public JmsTemplate getJmsTemplate(){
        if (jmsTemplate == null){
            jmsTemplate = (JmsTemplate) applicationContextUtil.getBeanFromAppContext("jmsForQueue");
        }
        return  jmsTemplate;
    }
}

Worker.java

@Component
public class Worker {

    @Autowired
    private JmsSender jmsSender;

    @JmsListener(destination = "#{appProperties.fromTopicName}")
    public String receiveMessageFromTopic(final String jsonMessage) throws JMSException {
        System.out.println("Received message " + jsonMessage);
        jmsSender.send(jsonMessage);
        return response;
    }
}

QueueConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
   <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:aop="http://www.springframework.org/schema/aop"
   xmlns:context="http://www.springframework.org/schema/context"
   xmlns:tx="http://www.springframework.org/schema/tx"
   xmlns:mvc="http://www.springframework.org/schema/mvc"
   xmlns:task="http://www.springframework.org/schema/task"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xmlns:cache="http://www.springframework.org/schema/cache"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
   http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
   http://www.springframework.org/schema/aop
   http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
   http://www.springframework.org/schema/context
   http://www.springframework.org/schema/context/spring-context-4.3.xsd
   http://www.springframework.org/schema/tx
   http://www.springframework.org/schema/tx/spring-tx-4.3.xsd
   http://activemq.apache.org/schema/core
   http://activemq.apache.org/schema/core/activemq-core-5.4.0.xsd
   http://www.springframework.org/schema/task
   http://www.springframework.org/schema/task/spring-task-4.3.xsd
   http://www.springframework.org/schema/mvc
   http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsd
   http://www.springframework.org/schema/cache
   http://www.springframework.org/schema/cache/spring-cache.xsd
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms
   http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"
   default-lazy-init="false">     

   <bean id="brokerUrl" class="java.lang.String">
       <constructor-arg value="#{appProperties.queueUrl}"/>
   </bean>

   <amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>

   <bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
       <constructor-arg ref="amqConnectionFactory"/>
       <property name="maxConnections" value="#{appProperties.maxConnections}"/>
       <property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
       <property name="maximumActiveSessionPerConnection" value = "10"/> 

   </bean>

   <bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
       <constructor-arg ref="connectionFactory1"/>
   </bean>

   <bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="#{appProperties.toQueueName}"/>
   </bean>
</beans>

Solution

  • The behavior you're seeing is expected. A JMS topic follows publish/subscribe (i.e. pub/sub) semantics where all subscribers get all the messages sent to the topic. In your case, you have 10 subscribers and you're sending 50 messages. Each of these 10 subscribers receives each of the 50 messages (in accordance with pub/sub semantics) and then forwards it to a queue. Therefore, the queue receives 500 messages.

    If you want all the consumers to share all the messages then you shouldn't use a JMS topic but rather a JMS queue.