Search code examples
spring-bootwebsocketspring-websocketstompactivemq-artemis

Spring WebSocketMessageBrokerConfigurer with Artemis 2.6.3 Multicast (topic) not working


Artemis mutilcast address and queue not having the expected behaviour. My idea is to create groups or specific messaging to a user that can have multiple websocket sessions (web, android, etc...). Server will publish notification to artemis multicast address and all subscribers should receive the notification. In the curren scenarion I just force user 'luislaves00' and create more than one session. In artemis I can see 2 consumers (not sure how Message Broker Relay from spring does it's job), but the consumers behave like a round robin and not publisher-subscriber. With the in memory broker from Spring it works fine but it's not durable, so when there's no subscriber connected the messages are droped. Here is the code I'm using:

Client side part:

function connect() {
    var socket = new SockJS('/notification-websocket');
    stompClient = Stomp.over(socket);
    var headers = {
        // todo: server will handle this logic
        'client-id': 'luisalves00',
        'durable-subscription-name': 'luisalves00',
        'id' : 'luisalves00'
    };
    stompClient.connect(headers, function(frame) {
        setConnected(true);
        console.log('Connected: ' + frame);
        // todo: server will handle this logic
        stompClient.subscribe('/topic/notification/username' + 'luisalves00', function(notification) {
            showNotification(JSON.parse(notification.body).content);
        }, headers);
    });
}

Broker Relay config:

public void configureMessageBroker(MessageBrokerRegistry config) {
        // Artemis ->
        // tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true
        config.enableStompBrokerRelay("/topic").setRelayHost("127.0.0.1").setRelayPort(61613);
        config.setApplicationDestinationPrefixes("/app");
        //config.enableSimpleBroker("/topic");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        logger.info("Registering the stomp endpoints.");

        registry.addEndpoint("/notification-websocket").setAllowedOrigins("*").withSockJS();

    }

Server dummy notification producer:

@Scheduled(fixedDelay = 20000)
public void scheduleTaskWithFixedDelay() {
    final Notification message = new Notification(UUID.randomUUID().toString() + " -> "  + dateTimeFormatter.format(LocalDateTime.now()));
    try {
        final String user = "luisalves00";
        logger.info("Creating msg={}", message);
        final Map<String, Object> headers = new HashMap<>();
        headers.put("subscription-id", user);
        template.convertAndSend("/topic/notification/username/" + user, message, headers);
    } catch (Exception e) {
        logger.error("", e);
    }
}

When the client subscribe artemis creates address and persistent queue with this parameters:

Addesses: 
id=2147496008 name=/topic/notification/group1/ routingType=[MULTICAST] queueCount=1

Queue
id=2147496011 
name=group1.group1 
address=/topic/notification/group1/ 
routingType=MULTICAST
durable=true
maxConsumers-1
purgeOnNoConsumers=false 
consumerCount=0

Solution

  • To make the durable work you must use:

    'client-id': '<some unique identifier for each client>'
    'durable-subscription-name': 'tech-news'
    

    For my implementation I stopped using durable, as the idea of the notification is to be delivered when created (low latency). If the consumer is not connected it can connection to an historic database to fetch old messages that he did not received when connected. If your really want to make it durable I suggest to handle in server side the 'client-id' and durable-subscription-name' for the connected user. If the user has no on going session, then create a durable queue. The next session he creates should be non durable, as they will receive the same messages as the durable one. If the first session dies, he still receive messages on the other sessions. When all die and he reconnect, it's going to be the first session again and he'll receive all messages that were not delivered on the durable queue (probably some he already received on the non durable ones), but he will have the history of all messages (that as said before I think it should be handled in other way).