Search code examples
activemq-classicbroker

What is the use case of BrokerService in ActiveMQ and how to use it correctly


I am new about ActiveMQ. I'm trying to study and check how it works by checking the example code provided by Apache at this link:-

http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html

    public class Server implements MessageListener {
        private static int ackMode;
        private static String messageQueueName;
        private static String messageBrokerUrl;

        private Session session;
        private boolean transacted = false;
        private MessageProducer replyProducer;
        private MessageProtocol messageProtocol;

        static {
            messageBrokerUrl = "tcp://localhost:61616";
            messageQueueName = "client.messages";
            ackMode = Session.AUTO_ACKNOWLEDGE;
        }

        public Server() {
            try {
                //This message broker is embedded
                BrokerService broker = new BrokerService();
                broker.setPersistent(false);
                broker.setUseJmx(false);
                broker.addConnector(messageBrokerUrl);
                broker.start();
            } catch (Exception e) {
                System.out.println("Exception: "+e.getMessage());
                //Handle the exception appropriately
            }

            //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
            //is ready to handle messages
            this.messageProtocol = new MessageProtocol();
            this.setupMessageQueueConsumer();
        }

        private void setupMessageQueueConsumer() {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                this.session = connection.createSession(this.transacted, ackMode);
                Destination adminQueue = this.session.createQueue(messageQueueName);

                //Setup a message producer to respond to messages from clients, we will get the destination
                //to send to from the JMSReplyTo header field from a Message
                this.replyProducer = this.session.createProducer(null);
                this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                //Set up a consumer to consume messages off of the admin queue
                MessageConsumer consumer = this.session.createConsumer(adminQueue);
                consumer.setMessageListener(this);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public void onMessage(Message message) {
            try {
                TextMessage response = this.session.createTextMessage();
                if (message instanceof TextMessage) {
                    TextMessage txtMsg = (TextMessage) message;
                    String messageText = txtMsg.getText();
                    response.setText(this.messageProtocol.handleProtocolMessage(messageText));
                }

                //Set the correlation ID from the received message to be the correlation id of the response message
                //this lets the client identify which message this is a response to if it has more than
                //one outstanding message to the server
                response.setJMSCorrelationID(message.getJMSCorrelationID());

                //Send the response to the Destination specified by the JMSReplyTo field of the received message,
                //this is presumably a temporary queue created by the client
                this.replyProducer.send(message.getJMSReplyTo(), response);
            } catch (JMSException e) {
                System.out.println("Exception: "+e.getMessage());
            }
        }

        public static void main(String[] args) {
            new Server();
        }
    }

My confusion about the messageBrokerUrl = "tcp://localhost:61616"; You know ActiveMQ service is running on port 61616 by default. Why does this example chooses same port. If I try to run the code thows eception as: Exception: Failed to bind to server socket: tcp://localhost:61616 due to: java.net.BindException: Address already in use: JVM_Bind

Perhaps if I change the port number, I can execute the code.

Please let me know why it is like this in the example and how to work with BrokerService.


Solution

  • The BrokerService in this example is trying to create an in memory ActiveMQ broker for use in the example. Given the error you are seeing I'd guess you already have an ActiveMQ broker running on the machine that is bound to port 61616 as that's the default port and thus the two are conflicting. You could either stop the external broker and run the example or modify the example to not run the embedded broker and just rely on your external broker instance.

    Embedded brokers are great for unit testing or for creating examples that don't require the user to have a broker installed and running.