Search code examples
activemq-classicmessagebrokerjms-topicvirtual-topic

Routing to virtual destinations inside ActiveMQ broker


I have an activemq configuration wherein I have a virtual destination and a normal topic

I want to route all the JMS messages to the destination(VirtualTopic.Notifications) to 2 queues(VirtualTopic.SMS, VirtualTopic.EMAIL) based on their JMSType in the message header.

And I want the normal Topic(VirtualTopic.gps) to work as usual.

This is my configuration of activemq.xml. Here Consumer.SMS.VirtualTopic and Consumer.EMAIL.VirtualTopic is created.

    <destinations>
        <queue physicalName="Consumer.SMS.VirtualTopic" />
        <queue physicalName="Consumer.EMAIL.VirtualTopic" />
    </destinations>

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <compositeQueue name="VirtualTopic.Notifications" forwardOnly="false">
            <forwardTo>
              <filteredDestination selector="JMSType = 'SMS'" queue="Consumer.SMS.VirtualTopic"/>
              <filteredDestination selector="JMSType = 'EMAIL'" queue="Consumer.EMAIL.VirtualTopic"/>
            </forwardTo>
          </compositeQueue>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>

While the consumer and topic (VirtualTopic.gps) is created from the server side code.

    private static MessageProducer getTopicProducer(String topicName) throws JMSException {
    MessageProducer producer = topicProducers.get(topicName);

    if (producer == null) {
        logger.info("Creating message producer for Topic : {}", topicName);
        Destination destination = session.createTopic(topicName);

        List<String> queueNames = PropertyReader
                .getPropertyStringList("jms.topic.consumer.list", JMSProducer.properties);
        if (queueNames != null) {
            for (String queueName : queueNames) { 
                Queue virtualQueue = session.createQueue(queueName);
                MessageConsumer con = session.createConsumer(virtualQueue);
                con.close();
            }
        }

        producer = session.createProducer(destination);
        topicProducers.put(topicName, producer);
    }

    return producer;
    }

All the messages to the VirtualTopic.Notifications are routed to 2 different queues and consumers can pick up messages from respective queues

But the issue is all the messages which are being sent to the VirtualTopic.gps are filtered and the consumers cant consume the gps messages.


Solution

  • Thank you so much Hassen..

    Adding this line <virtualTopic name=">" selectorAware="false" /> to the activemq.xml did the trick.

        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                    <compositeQueue name="VirtualTopic.Notifications"
                        forwardOnly="false">
                        <forwardTo>
                            <filteredDestination selector="JMSType = 'SMS'"
                                queue="Consumer.SMS.VirtualTopic" />
                            <filteredDestination selector="JMSType ='EMAIL'"
                                queue="Consumer.EMAIL.VirtualTopic" />
                        </forwardTo>
                    </compositeQueue>
                    <virtualTopic name=">" selectorAware="false" />
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>