Search code examples
springwebsocketrabbitmqstompspring-websocket

Subscribing to a removed queue with spring-websocket and RabbitMQ broker (Queue NOT_FOUND)


I have a spring-websocket (4.1.6) application on Tomcat8 that uses a STOMP RabbitMQ (3.4.4) message broker for messaging. When a client (Chrome 47) starts the application, it subscribes to an endpoint creating a durable queue. When this client unsubscribes from the endpoint, the queue will be cleaned up by RabbitMQ after 30 seconds as defined in a custom made RabbitMQ policy. When I try to reconnect to an endpoint that has a queue that was cleaned up, I receive the following exception in the RabbitMQ logs: "NOT_FOUND - no queue 'position-updates-user9zm_szz9' in vhost '/'\n". I don't want to use an auto-delete queue since I have some reconnect logic in case the websocket connection dies.

This problem can be reproduced by adding the following code to the spring-websocket-portfolio github example.

In the container div in the index.html add:

<button class="btn" onclick="appModel.subscribe()">SUBSCRIBE</button>
<button class="btn" onclick="appModel.unsubscribe()">UNSUBSCRIBE</button>

In portfolio.js replace:

stompClient.subscribe("/user/queue/position-updates", function(message) {

with:

positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {

and also add the following:

  self.unsubscribe = function() {
    positionUpdates.unsubscribe();
  }

  self.subscribe = function() {
    positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
      self.pushNotification("Position update " + message.body);
      self.portfolio().updatePosition(JSON.parse(message.body));
    });
  }

Now you can reproduce the problem by:

  1. Launch the application
  2. click unsubscribe
  3. delete the position-updates queue in the RabbitMQ console
  4. click subscribe

Find the error message in the websocket frame via the chrome devtools and in the RabbitMQ logs.


Solution

  • The problem is Stomp won't recreate the queue if it get's deleted by the RabbitMQ policy. I worked around it by creating the queue myself when the SessionSubscribeEvent is fired.

    public void onApplicationEvent(AbstractSubProtocolEvent event) {
       if (event instanceof SessionSubscribeEvent) {
          MultiValueMap nativeHeaders = (MultiValueMap)event.getMessage().getHeaders().get("nativeHeaders");
          List destination = (List)nativeHeaders.get("destination");
          String queueName = ((String)destination.get(0)).substring("/queue/".length());
    
          try {
             Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel();
             channel.queueDeclare(queueName, true, false, false, null);
          } catch (IOException e) {
             e.printStackTrace();
          }
       }
    }