Search code examples
rabbitmqstompspring-websocketspring-rabbitrabbitmq-stomp

Message is not sent to all active subscribers when using spring-websocket and rabbitmq-stomp


I have a web app based on spring websocket over stomp(powered by spring boot 1.5.1). And I'm using Rabbitmq(3.6.6) with stomp plug-in as full feature broker.

According to the doc of stomp, the message with destination from /topic/ will be delivered to all active subscribers.

Topic Destinations

For simple topic destinations which deliver a copy of each message to all active subscribers, destinations of the form /topic/ can be used. Topic destinations support all the routing patterns of AMQP topic exchanges.

Messages sent to a topic destination that has no active subscribers are simply discarded.

But the behavior is NOT consistent with above declaration in my app!

I opened the same page in two browsers. Hence there are two clients connecting to the websocket server. Both of them subscribed the same destination starting with /topic/.

After I sent the message to destination /topic/<route key>, but only one client will receive the message. The two client will rotate to receive the messages from the same destination.

In my spring server side app, I send the messages to destination like below,

    @Secured(User.ROLE_USER)
@MessageMapping("/comment/{liveid}")
@SendTo("/topic/comment-{liveid}")
public CommentMessage userComment(@DestinationVariable("liveid") String liveid,
                                     @AuthenticationPrincipal UserDetails activeUser, UserComment userComment) {
    logger.debug("Receiving comment message '{}' of live '{}' from user '{}'.",
            userComment,liveid, activeUser.getUsername());
    final User user = userService.findByUsername(activeUser.getUsername()).get();
    return CommentMessage.builder().content(userComment.getContent()).sender(user.getNickname())
            .senderAvatar(user.getAvatar()).build();
}

In my client side, it subscribes the durable topic like below,

$stomp.subscribe('/topic/comment-' + $scope.lives[i].id, function(payload, headers, res) {
                                // do something
                            }, {
                                'durable': true,
                                'auto-delete': false
                            });

Below is the configuration of websocket in my spring app,

@Configuration
@EnableWebSocketMessageBroker

public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<ExpiringSession> {

@Value("${stompBroker.host:localhost}")
String brokerHost;
@Value("${stompBroker.port:61613}")
int brokerPort;
@Value("${stompBroker.login:guest}")
String brokerLogin;
@Value("${stompBroker.passcode:guest}")
String brokerPasscode;
@Value("${stompBroker.vhost:myvhost}")
String brokerVHost;

@Override
protected void configureStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/live/ws").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/topic/").setRelayHost(brokerHost).setRelayPort(
            brokerPort).setSystemLogin(brokerLogin).setSystemPasscode(brokerPasscode).setVirtualHost(brokerVHost);

    /**
     * Both of two subscribers can receive the message if using simple broker 
    registry.enableSimpleBroker("/topic/");
     */
    registry.setApplicationDestinationPrefixes("/app");
}

@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
    @Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        messages.simpDestMatchers("/app/*").hasRole("USER");
    }

    @Override
    protected boolean sameOriginDisabled() {
        return true;
    }
}
}

Is there any wrong of my configuration of RabbitMQ and Stomp plugin? It works well when using SimpleMessageBroker not RabbitMQ.


Solution

  • It was resolved by the discussion in rabbimq-users group.

    I was used durable subscriptions, with the same ID, the consumers become competitive consumers.

    Either specifying the different client id when using durable queue or using auto-delete queue resolved the issue.