Search code examples
apache-camelactivemq-classicapache-servicemix

Routing queues with Service Mix


I'm trying to configure Apache ServiceMix to implement the network topology below:

enter image description here

I'm very new in Service Mix, Camel, ActiveMq etc. stuff and the main issue I'm trying to solve is to route messages from Output queue of DC1.ActiveMqBroker to the Input queue of DC2.ActiveMqBroker.

I'm sure this should be easy. Could someone point me to the good article or write rough snippet of configuration (in Spring/Blueprint, doesn't matter)?

UPDATE:

Sorry for long text but I don't see other way to tell my problem.

My sample configuration:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="dc2" dataDirectory="${karaf.data}/activemq/dc2" useShutdownHook="false">
    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue="input" producerFlowControl="true" memoryLimit="1mb"/>
                <policyEntry queue="output" producerFlowControl="true" memoryLimit="1mb"/>
            </policyEntries>
        </policyMap>
    </destinationPolicy> 
    <managementContext>
        <managementContext createConnector="false"/>
    </managementContext>
    <persistenceAdapter>
        <kahaDB directory="${karaf.data}/activemq/dc2/kahadb"/>
    </persistenceAdapter>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://localhost:61619"/>
    </transportConnectors>
</broker>

<bean id="dc1activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61619" />
</bean>

<bean id="dc2activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="tcp://localhost:61618" />
</bean>

<camelContext xmlns="http://camel.apache.org/schema/blueprint">
    <route>
        <from uri="dc1activemq:queue:output"/>
        <log message="Took message from dc1 to dc2"/>
        <to uri="dc2activemq:queue:input"/>
    </route>
</camelContext>

And I constantly get following error:

08:06:40,739 | INFO  | rint Extender: 3 | Activator                        | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Found 1 @Converter classes to load
08:06:40,740 | INFO  | rint Extender: 3 | Activator                        | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Found 1 @Converter classes to load
08:06:40,741 | INFO  | rint Extender: 3 | Activator                        | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Found 1 @Converter classes to load
08:06:40,741 | INFO  | rint Extender: 3 | Activator                        | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Found 2 @Converter classes to load
08:06:40,749 | INFO  | rint Extender: 3 | Activator                        | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Found 13 @Converter classes to load
08:06:40,754 | INFO  | rint Extender: 3 | BlueprintCamelContext            | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | JMX enabled. Using ManagedManagementStrategy.
08:06:40,758 | INFO  | rint Extender: 3 | BlueprintCamelContext            | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Apache Camel 2.6.0 (CamelContext: 211-camel-165) is starting
08:06:42,364 | INFO  | rint Extender: 3 | BlueprintCamelContext            | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Route: route55 started and consuming from: Endpoint[dc1activemq://queue:output]
08:06:42,364 | INFO  | rint Extender: 3 | BlueprintCamelContext            | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Total 1 routes, of which 1 is started.
08:06:42,365 | INFO  | rint Extender: 3 | BlueprintCamelContext            | ?                                   ? | 68 - or
g.apache.camel.camel-core - 2.6.0 | Apache Camel 2.6.0 (CamelContext: 211-camel-165) started in 1.606 seconds
08:06:48,379 | WARN  | tenerContainer-1 | DefaultMessageListenerContainer  | ?                                   ? | 77 - or
g.springframework.jms - 3.0.5.RELEASE | Could not refresh JMS Connection for destination 'output' - retrying in 5000 ms. Cau
se: Could not connect to broker URL: tcp://localhost:61619. Reason: java.net.ConnectException: Connection refused: connect
08:06:54,381 | WARN  | tenerContainer-1 | DefaultMessageListenerContainer  | ?                                   ? | 77 - or
g.springframework.jms - 3.0.5.RELEASE | Could not refresh JMS Connection for destination 'output' - retrying in 5000 ms. Cau
se: Could not connect to broker URL: tcp://localhost:61619. Reason: java.net.ConnectException: Connection refused: connect

Solution

  • I'm making an assumption that you've got a handle on how Camel is started up in ServiceMix and have a few (at least) proof-of-concept routes going.. if not, start there. Also assuming that you know the URLs of the 2 message brokers, and that they're set up already.

    Given that, one solution is to register two ActiveMQComponents:

    <bean id="dc1activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
       <property name="brokerURL" value="tcp://DC1BrokerLocation:12345" />
    </bean>
    
    <bean id="dc2activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
       <property name="brokerURL" value="tcp://DC2BrokerLocation:12345" />
    </bean>
    

    And then in your camel context:

    <camelContext xmlns="http://camel.apache.org/schema/blueprint">
       <route>
          <from uri="dc1activemq:queue:whateverTheOutQueueIsCalled"/>
          <log message="Took message from dc1 to dc2"/>
          <to uri="dc2activemq:queue:whateverTheInQueueIsCalled"/>
       </route>
       <route>
          <from uri="dc2activemq:queue:whateverTheOutQueueIsCalled"/>
          <log message="Took message from dc2 to dc1"/>
          <to uri="dc1activemq:queue:whateverTheInQueueIsCalled"/>
       </route>
    <camelContext>