I have a spring-websocket (4.1.6) application on Tomcat8 that uses a STOMP RabbitMQ (3.4.4) message broker for messaging. When a client (Chrome 47) starts the application, it subscribes to an endpoint creating a durable queue. When this client unsubscribes from the endpoint, the queue will be cleaned up by RabbitMQ after 30 seconds as defined in a custom made RabbitMQ policy. When I try to reconnect to an endpoint that has a queue that was cleaned up, I receive the following exception in the RabbitMQ logs: "NOT_FOUND - no queue 'position-updates-user9zm_szz9' in vhost '/'\n". I don't want to use an auto-delete queue since I have some reconnect logic in case the websocket connection dies.
This problem can be reproduced by adding the following code to the spring-websocket-portfolio github example.
In the container div in the index.html add:
<button class="btn" onclick="appModel.subscribe()">SUBSCRIBE</button>
<button class="btn" onclick="appModel.unsubscribe()">UNSUBSCRIBE</button>
In portfolio.js replace:
stompClient.subscribe("/user/queue/position-updates", function(message) {
with:
positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
and also add the following:
self.unsubscribe = function() {
positionUpdates.unsubscribe();
}
self.subscribe = function() {
positionUpdates = stompClient.subscribe("/user/queue/position-updates", function(message) {
self.pushNotification("Position update " + message.body);
self.portfolio().updatePosition(JSON.parse(message.body));
});
}
Now you can reproduce the problem by:
Find the error message in the websocket frame via the chrome devtools and in the RabbitMQ logs.
The problem is Stomp won't recreate the queue if it get's deleted by the RabbitMQ policy. I worked around it by creating the queue myself when the SessionSubscribeEvent is fired.
public void onApplicationEvent(AbstractSubProtocolEvent event) {
if (event instanceof SessionSubscribeEvent) {
MultiValueMap nativeHeaders = (MultiValueMap)event.getMessage().getHeaders().get("nativeHeaders");
List destination = (List)nativeHeaders.get("destination");
String queueName = ((String)destination.get(0)).substring("/queue/".length());
try {
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, true, false, false, null);
} catch (IOException e) {
e.printStackTrace();
}
}
}