Search code examples
javascriptjavastompactivemq-artemis

How do I send messages from Java program to a Javascript STOMP client using ActiveMQ-Artemis?


I'm trying to communicate back and forth between a Java process and a browser UI. To do this, I configured a multicast address in Artemis. The messages from the Java process are not making it to the browser subscribers, which are implemented using StompJS.

To test, I wrote a Hello World type of program in Java and in Javascript to simplify the probelm as much as I could. Both programs will start consumers on the address and then send 100 messages to the address. The consumers just log that they received the message.

The Java program receives all the messages that it produced and all the messages that the Javascript program produced. The Javascript program receives all the messages that it produced, but none of the messages that the Java program produced.

When I use the Artemis Console to look at the queue the STOMP client has created, it shows 200 messages added, 100 acknowledged, and 100 stuck in delivering. enter image description hereenter image description here

If I use the Artemis Console to send a message to that address, that message is received by both the Java and Javascript programs.

Any ideas what I'm doing wrong here?

Javascript program

import { Client } from "@stomp/stompjs"

let client;

export function testStomp() {
    client = new Client();
    client.brokerURL = "ws://messaging-host:7961";
    client.onConnect = handleConnected;
    client.activate();
}

function handleConnected() {
    client.subscribe("broadcastMessage", message => console.log(message.body));
    const message = { destination: "broadcastMessage" };
    for (let i = 0; i < 100; i++) {
        message.body = "Message from javascript " + i;
        client.publish(message);
    }
}

Java program:

package com.whatever;

import org.apache.activemq.artemis.api.core.*;
import org.apache.activemq.artemis.api.core.client.*;
import java.util.UUID;

public class TestArtemis {
    public static void main(String[] args) throws Exception {
        String url = "tcp://messaging-host:7961";
        ServerLocator serverLocator = ActiveMQClient.createServerLocator(url);
        ClientSession session = serverLocator.createSessionFactory().createSession();
        startConsumer(session);
        ClientProducer producer = session.createProducer();
        for (int i = 0; i < 100; i++) {
            String payload = "Message from java " + i;
            ClientMessage message = session.createMessage(false);
            message.getBodyBuffer().writeBytes(payload.getBytes());
            producer.send("broadcastMessage", message);
            Thread.sleep(1000);
        }
    }

    private static void startConsumer(ClientSession session) throws Exception{
        session.start();
        String queueName = UUID.randomUUID().toString();
        session.createQueue(new QueueConfiguration(queueName)
                .setAddress("broadcastMessage")
                .setDurable(false)
                .setRoutingType(RoutingType.MULTICAST)
                .setTemporary(true));
        ClientConsumer consumer = session.createConsumer(queueName);
        consumer.setMessageHandler(message -> {
            byte[] bytes = new byte[message.getBodyBuffer().readableBytes()];
            message.getDataBuffer().readBytes(bytes);
            System.out.println("Received " + new String(bytes));
        });
    }
}

Solution

  • ActiveMQ Artemis uses the content-length header to detect if the STOMP message type is binary or text.

    To use text STOMP message type the Javascript program has to skip the content-length header and the Java program has to produce messages with text type i.e.

    Javascript program changes:

    const message = { destination: "broadcastMessage", skipContentLengthHeader: true };
    

    Java program:

    ClientMessage message = session.createMessage(Message.TEXT_TYPE, false);
    message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(payload));
    

    To use binary STOMP message type the Java program has to produce messages with the content-length header i.e.

    String payload = "Message from java " + i;
    byte[] payloadBytes = payload.getBytes();
    ClientMessage message = session.createMessage(false);
    message.putIntProperty("content-length", payloadBytes.length);
    message.getBodyBuffer().writeBytes(payloadBytes);