Search code examples
springspring-mvcspring-websocket

Spring Websocket in a tomcat cluster


In our current application, we use Spring Websockets over STOMP. We are looking to scale horizontally. Are there any best practices on how we should handle websocket traffic over multiple tomcat instances and how can we maintain session info across multiple nodes.Is there a working sample that one can refer to?


Solution

  • Your requirement can be divided into 2 sub tasks:

    1. Maintain session info across multiple nodes: You can try Spring Sessions clustering backed by Redis (see: HttpSession with Redis). This very very simple and already has support for Spring Websockets (see: Spring Session & WebSockets).

    2. Handle websockets traffic over multiple tomcat instances: There are several ways to do that.

      • The first way: Using a full-featured broker (eg: ActiveMQ) and try new feature Support multiple WebSocket servers (from: 4.2.0 RC1)
      • The second way: Using a full-feature broker and implement a distributed UserSessionRegistry (eg: Using Redis :D ). The default implementation DefaultUserSessionRegistry using an in-memory storage.

    Updated: I've written a simple implementation using Redis, try it if you are interested

    To configure a full-featured broker (broker relay), you can try:

    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
        ...
    
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry config) {
            config.enableStompBrokerRelay("/topic", "/queue")
                .setRelayHost("localhost") // broker host
                .setRelayPort(61613) // broker port
                ;
            config.setApplicationDestinationPrefixes("/app");
        }
    
        @Bean
        public UserSessionRegistry userSessionRegistry() {
            return new RedisUserSessionRegistry(redisConnectionFactory);
        }
    
        ...
    }
    

    and

    import java.util.Set;
    
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.BoundHashOperations;
    import org.springframework.data.redis.core.BoundSetOperations;
    import org.springframework.data.redis.core.RedisOperations;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import org.springframework.messaging.simp.user.UserSessionRegistry;
    import org.springframework.util.Assert;
    
    /**
     * An implementation of {@link UserSessionRegistry} backed by Redis.
     * @author thanh
     */
    public class RedisUserSessionRegistry implements UserSessionRegistry {
    
        /**
         * The prefix for each key of the Redis Set representing a user's sessions. The suffix is the unique user id.
         */
        static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:";
    
        private final RedisOperations<String, String> sessionRedisOperations;
    
        @SuppressWarnings("unchecked")
        public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) {
            this(createDefaultTemplate(redisConnectionFactory));
        }
    
        public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) {
            Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null");
            this.sessionRedisOperations = sessionRedisOperations;
        }
    
        @Override
        public Set<String> getSessionIds(String user) {
            Set<String> entries = getSessionBoundHashOperations(user).members();
            return (entries != null) ? entries : Collections.<String>emptySet();
        }
    
        @Override
        public void registerSessionId(String user, String sessionId) {
            getSessionBoundHashOperations(user).add(sessionId);
        }
    
        @Override
        public void unregisterSessionId(String user, String sessionId) {
            getSessionBoundHashOperations(user).remove(sessionId);
        }
    
        /**
         * Gets the {@link BoundHashOperations} to operate on a username
         */
        private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) {
            String key = getKey(username);
            return this.sessionRedisOperations.boundSetOps(key);
        }
    
        /**
         * Gets the Hash key for this user by prefixing it appropriately.
         */
        static String getKey(String username) {
            return BOUNDED_HASH_KEY_PREFIX + username;
        }
    
        @SuppressWarnings("rawtypes")
        private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) {
            Assert.notNull(connectionFactory, "connectionFactory cannot be null");
            StringRedisTemplate template = new StringRedisTemplate(connectionFactory);
            template.setKeySerializer(new StringRedisSerializer());
            template.setValueSerializer(new StringRedisSerializer());
            template.afterPropertiesSet();
            return template;
        }
    
    }