Search code examples
javastreamactivemq-classicapache-stormjms-topic

How to integrate jms topic to feed Storm spout


I have an ActiveMQ topic provider. I need to feed the data received from that topic into the Storm topic. is there any way to do it directly or should I create intermediate queue and feed topic data into the queue and then pull the data into the spout. Which is the best option?


Solution

  • I have went through the ptgoetz's Storm JMS Examples and made a solution to directly feed the topic data to a spout.

    need to specify the topic in jms-activemq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans 
      xmlns="http://www.springframework.org/schema/beans" 
      xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <amq:topic id="topic" physicalName="myTopic" />
    
        <amq:connectionFactory id="jmsConnectionFactory"
            brokerURL="tcp://localhost:61616" />
    </beans>
    

    then we can create JmsSpout like bellow with the Jms Acknowledge Mode in session.AUTO_ACKNOWLEDGE

    JmsProvider jmsTopicProvider = new SpringJmsProvider("jms-activemq.xml", "jmsConnectionFactory", "topic");
    
    JmsTupleProducer producer = new JsonTupleProducer();
    
    JmsSpout topicSpout = new JmsSpout();
    topicSpout.setJmsProvider(jmsTopicProvider);
    topicSpout.setJmsTupleProducer(producer);
    topicSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
    topicSpout.setDistributed(false);