Search code examples
springjmsactivemq-classicjmstemplate

JMS Topic Subscriber in Spring using JMS template/ Message Subscriber


I have a simple Spring application for JMS Producer/Subscriber using ActiveMQ with below configuration :

Application Context xml :

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
    <property name="userName" value="user" />
    <property name="password" value="password" />
</bean>
<bean id="messageDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="messageQueue1" />
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE">
    </property>
</bean>

<bean id="springJmsProducer" class="SpringJmsProducer">
    <property name="destination" ref="messageDestination" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

<bean id="springJmsConsumer" class="SpringJmsConsumer">
    <property name="destination" ref="messageDestination" />
    <property name="jmsTemplate" ref="jmsTemplate" />
</bean>

and Below is Spring producer

public class SpringJmsProducer {
private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
    return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
    return destination;
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

public void sendMessage(final String msg) {
    jmsTemplate.send(destination, new MessageCreator() {
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(msg);
        }});        
 }
}

below is Spring Consumer:

public class SpringJmsConsumer {
private JmsTemplate jmsTemplate;
private Destination destination;

public JmsTemplate getJmsTemplate() {
    return jmsTemplate;
}

public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public Destination getDestination() {
    return destination;
}

public void setDestination(Destination destination) {
    this.destination = destination;
}

public String receiveMessage() throws JMSException {
    TextMessage textMessage =(TextMessage) jmsTemplate.receive(destination);        
    return textMessage.getText();
 }
}

Issue : When i start producer and post messages, and then i start consumer, Consumer is not reading old messages but only reading messages posted after consumer was started. Could anyone please help me how to make this durable subscriber so that messages in queue which are not acknowledged should be read by consumer and also i need to implement Synchronous Consumer not Asynchronous.

I have tried all possible solution but none is working. Any help is highly appreciated


Solution

  • if you want consumer receive messages sent to the topic before he starts you have 2 choice :

    1. Use Activemq Retroactive Consumer

    Background A retroactive consumer is just a regular JMS Topic consumer who indicates that at the start of a subscription every attempt should be used to go back in time and send any old messages (or the last message sent on that topic) that the consumer may have missed.

    See the Subscription Recovery Policy for more detail.

    You mark a Consumer as being retroactive as follows:

    topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
    

    http://activemq.apache.org/retroactive-consumer.html

    2. Use Durable Subscribers :

    Note that the Durable Subscriber receive messages sent to the topic before he starts at the 2nd run

    http://activemq.apache.org/manage-durable-subscribers.html

    This is possible with DefaultMessageListenerContainer Asynchronously

    <bean id="jmsContainer" destroy-method="shutdown"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="messageDestination" />
        <property name="messageListener" ref="messageListenerAdapter" />
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
        <property name="subscriptionDurable" value="true" />
        <property name="clientId" value="UniqueClientId" />
    </bean>
    
    <bean id="messageListenerAdapter"
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="springJmsConsumer" />
    </bean>
    <bean id="springJmsConsumer" class="SpringJmsConsumer">
    </bean>
    

    AND Update your consumer :

    public class SpringJmsConsumer implements javax.jms.MessageListener {
    
        public void onMessage(javax.jms.Message message) {
            // treat message;
            message.acknowledge();
        }
    }
    

    UPDATE to use

    if you want a Synchronous Durable Subscriber, an example

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicSubscriber;
    
    public class SpringJmsConsumer {
    
        private Connection conn;
        private TopicSubscriber topicSubscriber;
    
        public SpringJmsConsumer(ConnectionFactory connectionFactory, Topic destination ) {
            conn = connectionFactory.createConnection("user", "password");
            Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            topicSubscriber = session.createDurableSubscriber(destination, "UniqueClientId");
            conn.start();
        }
    
        public String receiveMessage() throws JMSException {
            TextMessage textMessage = (TextMessage) topicSubscriber.receive();
            return textMessage.getText();
        }
    }
    

    And update springJmsConsumer

    <bean id="springJmsConsumer" class="SpringJmsConsumer">
        <constructor-arg ref="connectionFactory" />
        <constructor-arg ref="messageDestination" />
    </bean>
    

    Note that connection failures are not managed by this code.