Search code examples
javajmsactivemq-classic

ActiveMQ message not being recieved when sent via Java, but does work when using the web console


I have a service set up to send and receive messages via ActiveMQ 5.16.1, and I plan on implementing other messaging services later such as Apache Kafka.

When I call the send and receive functions below, the receive function stalls and I have to manually close the program. However, when I use the ActiveMQ web console to send a message the receive function works.

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
import arc.ipc.IService;

public class ActiveMQService<K, V> implements IService<K, V> {
    private String brokerAddress;
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;

    public ActiveMQService(String brokerAddress) {
        this.brokerAddress = brokerAddress;
    }

    @Override
    public void send(String topic,V value) throws JMSException {
        Destination destination = session.createTopic(topic);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage message = session.createTextMessage(value.toString());
        producer.send(message);
    }

    @Override
    public String receive(String topic) throws JMSException {
        Destination destination = session.createTopic(topic);
        MessageConsumer consumer = session.createConsumer(destination);
        Message message = consumer.receive();
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            return textMessage.getText();
        }
        return null;
    }

    @Override
    public void connect() throws JMSException {
        connectionFactory = new ActiveMQConnectionFactory(brokerAddress);
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    }

    @Override
    public void disconnect() throws JMSException {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

Solution

  • I believe the problem is that you're sending your message to a JMS topic and then creating a subscriber on the topic to the receive the message you just sent. JMS topics follow normal pub/sub semantics which means that the subscription must exist before a message is sent.

    Also, when the consumer doesn't receive a message it is simply blocking forever, i.e.:

    Message message = consumer.receive();
    

    You might consider using a timeout and logging an error or throwing an exception, e.g.:

    Message message = consumer.receive(500);
    if (message == null) {
       throw new IllegalStateException("Did not receive message!");
    }