Search code examples
javaamqpactivemq-artemis

How to Configure ActiveMQ Artemis Server to Interoperate Messages Across Different Procotols AMQP and Openwire


I have an ActiveMQ Artemis server that is configured like so:

public static void updateConfig(Configuration config) {
    try {
        config.setPersistenceEnabled(false)
              .setSecurityEnabled(false)
              .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
              .addAcceptorConfiguration("amqp", "tcp://localhost:5672");
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

It can successfully listen to topics from my nodejs pub-sub applications using AMQP and my java applications which use openwire. The topic being used is TestTopic.

While this works neither of the protocols seem to federate messages across each other despite being received by the server.

If a message is published through TestTopic using AMQP on my NodeJS client the server receives the message and successfully broadcasts to NodeJS client subscribers, but other listeners on OpenWire from the Java apps will never observe the message. Similarly, Java OpenWire clients can send and receive to one another but Java client messages are never observed by NodeJS clients.

JMS server sees successful messages on both protocols but never broadcasts to all listeners/receivers on TestTopic.

It seems I may be missing configuration in my ActiveMQ Artemis server.

The documentation says the following:

You should prefix destination address with queue:// to use queue based destinations or topic:// to use topic based destinations. The destination type defaults to queue when the destination prefix is omitted.

I have NodeJS destination topics configured like so 'topic://TestTopic'.

// subscriber.js
var args = require('./options.js').options({
  'client': { default: 'my-client', describe: 'name of identifier for client container'},
  'subscription': { default: 'my-subscription', describe: 'name of identifier for subscription'},
  't': { alias: 'topic', default: 'topic://TestTopic', describe: 'name of topic to subscribe to'},
  'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
  'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).help('help').argv;

var connection = require('rhea').connect({ port:args.port, host: args.host, container_id:args.client });
connection.on('receiver_open', function (context) {
  console.log('subscribed');
});
connection.on('message', function (context) {
  if (context.message.body === 'detach') {
      // detaching leaves the subscription active, so messages sent
      // while detached are kept until we attach again
      context.receiver.detach();
      context.connection.close();
  } else if (context.message.body === 'close') {
      // closing cancels the subscription
      context.receiver.close();
      context.connection.close();
  } else {
      console.log(context.message.body);
  }
});
// the identity of the subscriber is the combination of container id
// and link (i.e. receiver) name
connection.open_receiver({name:args.subscription, source:{address:args.topic, durable:2, expiry_policy:'never'}});

In the Java listener the topic is configured as TestTopic like below:

// client-sender.java
@JmsListener(destination = "TestTopic", selector = "${selector}")
public void receiveMessage(Message message) {
    String type = (message instanceof MapMessage) ? "MapMessage" : "TextMessage";
    try {
        logMessageBody(message);
        }
    } catch (Exception ex) {
        logger.error("Exception caught while logging the received message: ");
        logger.error(ex + ": " + ex.getCause());
    }
}

Solution

  • Exchanging messages between protocols is definitely supported and will work with the proper configuration.

    It's worth noting that the documentation you cited is actually for ActiveMQ Classic rather than ActiveMQ Artemis. To be clear, you can use prefixes with ActiveMQ Artemis, but they are configurable (unlike in ActiveMQ Classic), and they must be specified on your acceptor URL in order to be applied. This is discussed in the actual ActiveMQ Artemis documentation. In your case you need this:

    public static void updateConfig(Configuration config) {
        try {
            config.setPersistenceEnabled(false)
                  .setSecurityEnabled(false)
                  .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
                  .addAcceptorConfiguration("amqp", "tcp://localhost:5672?anycastPrefix=queue://;multicastPrefix=topic://");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    

    To be clear, JMS queue and topic destinations are automatically mapped to anycast and multicast respectively.

    Once both clients are using compatible routing-types then messages should be able to flow back and forth between them. Keep in mind that pub/sub (i.e. JMS topic) semantics require that the subscriber is created before any messages are sent. Messages sent when no subscriber exists are simply discarded.