Search code examples
websocketactivemq-classicstompspring-websocket

WebSocket messages are lost while client is reconnecting


I use STOMP.js on the front end and ActiveMQ on the back end for sending push notifications to the client. The client first subscribes to a topic with the following code:

function stompConnect() {
    console.log('STOMP: Attempting connection');
    // recreate the stompClient to use a new WebSocket
    var socket = new SockJS('/websocket');
    var stompClient = Stomp.over(socket);

    stompClient.connect({}, function(frame) {
        stompClient.subscribe('/topic/table-updates', function(notification){
            showNotification(JSON.parse(notification.body));
        });
    }, function (error) {
        console.log('STOMP: ' + error);
        setTimeout(stompConnect, 10000);
        console.log('STOMP: Reconnecting in 10 seconds');
    });
}

stompConnect();

Sometimes the underlying websocket connection is lost and the client needs to reconnect and subscribe to the topic again (with 10 seconds timeout). This results in that some messages from the server are lost while the client is reconnecting. Is there any way to prevent this?

I use Spring WebSocket on the back end. Here is configuration:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Value("${stomp.port}")
    private Integer stompPort;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry
            .enableStompBrokerRelay("/topic/")
            .setRelayPort(stompPort);
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public BrokerService brokerService() throws Exception {
        final BrokerService broker = BrokerFactory.createBroker(
            String.format("broker:(vm://localhost,stomp://localhost:%d)?persistent=false", stompPort));

        broker.addShutdownHook(new SpringContextHook());
        return broker;
    }
} 

Solution

  • Certainly do-able.

    You'll need to change some things up.

    It's all described here: Using Queue Browsers to Implement Durable Topic Subscriptions

    But i'll give you a quick rundown.

    Some important changes to your messaging structure:

    • Subscribe your websocket consumers to "/queue/"s instead of topics and setup queue browsing.
    • Send messages from your producer using Reliable Messaging

    Here's a basic example:

    1. User opens webpage and subscribes to /queue/important-stuff with queue browsing at a sequence number of -1(New messages only).
    2. For each message that arrives store its sequence number somewhere. whilst the user has fun with their important data in the messages.
    3. User's websocket connection drops.
    4. Detect this and re-connect, be sure to use Jeff's example here as a reference, web sockets can only be opened once.
    5. Re-subscribe and pass in the stored sequence number in the from-seq header.
    6. Apollo broker will send the missing messages.

    Keep in mind that the broker can get a tad clogged with messages you'll need to follow advice on the page and:

    • Send messages to the queue with an expiration time so that they are automatically delete once the expiration time is reached.
    • Periodically run a normal consumer application which can cursor the queue and delete messages are are deemed no longer needed.