Search code examples
springrabbitmqsockjsspring-websocket

Spring Websocket exception during close when using StompBrokerRelay (RabbitMQ)


When using RabbitMQ with Spring Websockets, an exception occurs through Reactor as follows during websocket closing:

2015-03-30 11:37:23.647 [reactor-tcp-io-1] DEBUG o.s.m.s.s.StompBrokerRelayMessageHandler - Failure while clearing TCP connection state in session 39cb9vbm
java.util.NoSuchElementException: null
    at com.gs.collections.impl.list.mutable.MutableIterator.next(MutableIterator.java:57) ~[gs-collections-5.1.0.jar:na]
    at com.gs.collections.impl.list.mutable.MultiReaderFastList$UntouchableListIterator.next(MultiReaderFastList.java:1267) ~[gs-collections-5.1.0.jar:na]
    at reactor.event.registry.CachingRegistry$2.value(CachingRegistry.java:70) ~[reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.registry.CachingRegistry$2.value(CachingRegistry.java:67) ~[reactor-core-1.1.5.RELEASE.jar:na]
    at com.gs.collections.impl.list.mutable.MultiReaderFastList.withWriteLockAndDelegate(MultiReaderFastList.java:179) ~[gs-collections-5.1.0.jar:na]
    at reactor.event.registry.CachingRegistry.unregister(CachingRegistry.java:67) ~[reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.net.AbstractNetChannel.close(AbstractNetChannel.java:185) [reactor-net-1.1.5.RELEASE.jar:na]
    at org.springframework.messaging.tcp.reactor.Reactor11TcpConnection.close(Reactor11TcpConnection.java:63) ~[spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.clearConnection(StompBrokerRelayMessageHandler.java:808) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.afterDisconnectSent(StompBrokerRelayMessageHandler.java:777) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler.access$3100(StompBrokerRelayMessageHandler.java:496) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler$3.onSuccess(StompBrokerRelayMessageHandler.java:749) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$StompConnectionHandler$3.onSuccess(StompBrokerRelayMessageHandler.java:745) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.util.concurrent.ListenableFutureCallbackRegistry.success(ListenableFutureCallbackRegistry.java:119) [spring-core-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at org.springframework.messaging.tcp.reactor.AbstractPromiseToListenableFutureAdapter$1.accept(AbstractPromiseToListenableFutureAdapter.java:57) [spring-messaging-4.1.6.RELEASE.jar:4.1.6.RELEASE]
    at reactor.core.action.CallbackAction.doAccept(CallbackAction.java:36) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.action.Action.accept(Action.java:52) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.action.Action.accept(Action.java:32) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ArgumentConvertingConsumerInvoker.invoke(ArgumentConvertingConsumerInvoker.java:73) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ConsumerFilteringEventRouter.route(ConsumerFilteringEventRouter.java:78) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.dispatch.SynchronousDispatcher.dispatch(SynchronousDispatcher.java:75) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.notify(Reactor.java:242) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.notify(Reactor.java:249) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor.notify(Reactor.java:57) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Composable.notifyValue(Composable.java:494) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Promise.notifyValue(Promise.java:571) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Deferred.acceptEvent(Deferred.java:118) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.composable.Deferred.accept(Deferred.java:83) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.net.netty.NettyNetChannel$2.operationComplete(NettyNetChannel.java:101) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.netty.NettyNetChannel$2.operationComplete(NettyNetChannel.java:89) [reactor-net-1.1.5.RELEASE.jar:na]
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:682) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:624) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:144) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at reactor.net.netty.NettyNetChannel.write(NettyNetChannel.java:89) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.netty.NettyNetChannel.write(NettyNetChannel.java:83) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.AbstractNetChannel.write(AbstractNetChannel.java:239) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.net.AbstractNetChannel$WriteConsumer.accept(AbstractNetChannel.java:308) [reactor-net-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor$5.accept(Reactor.java:373) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.core.Reactor$5.accept(Reactor.java:370) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ArgumentConvertingConsumerInvoker.invoke(ArgumentConvertingConsumerInvoker.java:73) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.routing.ConsumerFilteringEventRouter.route(ConsumerFilteringEventRouter.java:103) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.dispatch.AbstractLifecycleDispatcher.route(AbstractLifecycleDispatcher.java:64) [reactor-core-1.1.5.RELEASE.jar:na]
    at reactor.event.dispatch.AbstractMultiThreadDispatcher$MultiThreadTask.run(AbstractMultiThreadDispatcher.java:55) [reactor-core-1.1.5.RELEASE.jar:na]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) [netty-all-4.0.20.Final.jar:4.0.20.Final]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]

This exception does not occur if using the SimpleBroker.

We are using the following versions:

  • Spring Framework 4.1.6 (same version for websocket, messaging, etc)
  • Spring Security 4.0.0.RC1
  • Project Reactor 1.1.5.RELEASE
  • Spring Session 1.0.0.RELEASE
  • Javax Websocket API 1.0
  • Tomcat 7.0.54

Our config is very straightforward:

public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {
    @Autowired
    private SessionRepository sessionRepository;

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Value("${rabbitmq.enabled=true}")
    private boolean rabbitMqEnabled;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp")
                .setHandshakeHandler(new DefaultHandshakeHandler(new TomcatRequestUpgradeStrategy()))
                .addInterceptors(sessionRepositoryInterceptor())
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        if (rabbitMqEnabled) {
            registry.enableStompBrokerRelay("/topic", "/queue")
                    .setClientLogin("....").setClientPasscode("....")
                    .setSystemLogin("....").setSystemPasscode("....")
                    .setRelayHost("....").setRelayPort(....)
                    .setVirtualHost("....");
        } else {
            registry.enableSimpleBroker("/topic", "/queue");
        }

        registry.setApplicationDestinationPrefixes("/ws");
    }

    ...
}

Any ideas or guidance?

I've also created a bug ticket for tracking


Solution

  • Has been marked as expected behavior by the Spring devs.

    For people coming across this, it would probably be helpful to just turn down the logging for StompBrokerRelayMessageHandler to something like INFO.

    For logback:

    <logger name="org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler" level="INFO"/>