Search code examples
spring-bootactivemq-classic

Cannot publish Topic-message to two subscriber simlutaneously using ActiveMQ


I have referred to the SpringBoot application publish and read from ActiveMQ topic to, publish a Topic using ActiveMQ. I have created two receiver micro-services which reads message from the topic.I have also created rest endpoint to publish the Topic.However, I have to execute this rest end point two times to publish the message for two receivers
1). The first execution of rest endpoint will send message to Receiver1
2). The second execution of rest endpoint will send message to Receiver2

Hence 2 receivers could not read from the Topic simultaneously.
Here is my code.

PublisherApplication.java

package com.springboot;

//import statements

@SpringBootApplication
@EnableDiscoveryClient
@EnableJms
public class PublisherApplication {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();      
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        //setPubSubDomain identifies Topic in ActiveMQ
        factory.setPubSubDomain(true);
        return factory;
    }


    public static void main(String[] args) {  
        SpringApplication.run(PublisherApplication.class, args);
    }

    @Bean
    public RestTemplate restTemplate(RestTemplateBuilder builder) {
        return builder.build();

    }

}

PublishMessage.java
[Rest-end point to publish Topic]

package com.springboot.controller;

//import statements

@RestController
@RequestMapping(path = "/schoolDashboard/topic")
class PublishMessage {

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @Autowired
    private JmsTemplate jmsTemplate;

    @GetMapping(path = "/sendEmail")
    public void sendStudentById() throws Exception{
        System.out.println("Anindya-TopicSendMessage.java :: Publishing Email sent....");
        jmsTemplate.convertAndSend(MAILBOX_TOPIC, "Topic - Email Sent");
    }

}

ReceiverApplication01
[Note - Receiver01 is first microservice]

package com.springboot;

//import statements

@SpringBootApplication
@EnableJms
public class ReceiverApplication01 {


    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();      
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        //setPubSubDomain identifies Topic in ActiveMQ
        factory.setPubSubDomain(true);
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication01.class, args);
    }

}

TopicMesssgeReceiver01.java
[Receiver01 Read message from Topic]

package com.springboot.message;

//import statement

@Component
public class TopicMesssgeReceiver01 {

    private final SimpleMessageConverter converter = new SimpleMessageConverter();

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @JmsListener(destination = MAILBOX_TOPIC, containerFactory = "topicListenerFactory")
    public void receiveMessage(final Message message) throws JMSException{
        System.out.println("Receiver01 <" + String.valueOf(this.converter.fromMessage(message)) + ">");
    }

}

ReceiverApplication02
[Note:-Receiver02 is second microservice]

package com.springboot;

//import statement

@SpringBootApplication
@EnableJms
public class ReaderApplication02 {

    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);       
        return factory;
    }

    public static void main(String[] args) {
        SpringApplication.run(ReaderApplication02.class, args);
    }

}

TopicMesssgeReceiver02
[Receiver02 Read message from Topic]


package com.springboot.message;

//import statement

@Component
public class TopicMesssgeReceiver02 {


private final SimpleMessageConverter converter = new SimpleMessageConverter();

    public static final String MAILBOX_TOPIC = "mailbox.topic";

    @JmsListener(destination = MAILBOX_TOPIC, containerFactory = "topicListenerFactory")
    public void receiveMessage(final Message message) throws Exception{
        System.out.println("Receiver02 <" + String.valueOf(this.converter.fromMessage(message)) + ">");
    }

}


Solution

  • Thanks Naveen!! Finally, I am able to do it.
    We have to set only setPubSubDomain(true); and spring-boot will take care all boiler-plate code.
    Now, two receiver Microservices can read message from Topic simultaneously
    Following are the code changes

    PublishMessage.java
    [Rest-end point to publish Topic]

    package com.springboot.controller;
    
    //import statements
    
    @RestController
    @RequestMapping(path = "/schoolDashboard/topic")
    class PublishMessage {
    
        public static final String MAILBOX_TOPIC = "mailbox.topic";
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @GetMapping(path = "/sendEmail")
        public void sendStudentById() throws Exception{
            System.out.println("Publisher :: Message sent...");
            /* Added this statement. setPubSubDomain(true) identifies Topic in ActiveMQ */
            jmsTemplate.setPubSubDomain(true);
            jmsTemplate.convertAndSend(MAILBOX_TOPIC, "Topic - Email Sent");
        }
    
    }
    

    ReceiverApplication02
    [Note:-Receiver02 is second microservice]

    package com.springboot;
    
    //import statement
    
    @SpringBootApplication
    @EnableJms
    public class ReaderApplication02 {
    
        @Bean
        public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
                DefaultJmsListenerContainerFactoryConfigurer configurer) {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();       
            configurer.configure(factory, connectionFactory);    
            /* setPubSubDomain(true) should be placed after 
             * configuration of the specified jms listener container factory*/
            factory.setPubSubDomain(true);
            return factory;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(ReaderApplication02.class, args);
        }
    
    }