Search code examples
springspring-websocketspring-oauth2spring-messaging

If resource server are supposed to be stateless, how to send message to queue with websocket


I am currently working in messaging system, where resource server is stateless with oAuth 2. Now, i have to send a message to single user with a queue but problem is that spring messaging needed a session in other to send a messaging as described in https://stackoverflow.com/a/31577152/3076403.

The problem with me is how to get currently login user in stateless restful service:

@MessageMapping("/messaging")
public void messaging( Message<Object> message) {
    Principal user=    
        message.getHeaders()
             .get(SimpMessageHeaderAccessor.USER_HEADER,Principal.class); 

        messageTemplate.convertAndSend("/topic/users", user.getName());
}

Spring will use the queue when we use simpMessagingTemplate.convertAndSendToUser(...) method and pass the username associated with session id. Otherwise it will use a topic, where all subscribed clients will eventually read the same message returned from the server.

As I have no session in resource server and need queue to send message to individual user.Any comments and ideas appreciated


Solution

  • Finally after all i get a solution. By decoding json web token for username and providing authentication to username solve above problems. JwtAuthentication is custom class which is responsible for decoding JWT and providing authentication to username of JWT

    @Configuration
    @EnableWebSocketMessageBroker
    @Order(Ordered.HIGHEST_PRECEDENCE + 99)
    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
        @Autowired
        private SimpUserRegistry userRegistry;
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry config) {
            config.enableSimpleBroker("/topic","/queue");
            // use the /app prefix for others
            config.setApplicationDestinationPrefixes("/app");
        }
    
        @Autowired
        private JwtAuthentication jwtAuthentication;
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            // use the /messaging endpoint (prefixed with /app as configured above) for incoming requests
            registry.addEndpoint("/messaging").setAllowedOrigins("http://localhost:8080").withSockJS();
        }
    
        @Override
        public void configureClientInboundChannel(ChannelRegistration registration) {
            registration.setInterceptors(new ChannelInterceptorAdapter() {
                @Override
                public Message<?> preSend(Message<?> message, MessageChannel channel) {
                    StompHeaderAccessor accessor =
                            MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                    List<String> tokenList = accessor.getNativeHeader("Authorization");
                    String token = null;
                    if(tokenList != null && tokenList.size() > 0) {
                      token = tokenList.get(0).replaceAll("Bearer", "").trim();
                    }
                    if (StompCommand.CONNECT.equals(accessor.getCommand()) || StompCommand.SUBSCRIBE.equals(accessor.getCommand()) || StompCommand.SEND.equals(accessor.getCommand()) ) {
                        Authentication auth =  SecurityContextHolder.getContext().getAuthentication();
                        if(auth==null){
                             Authentication user = jwtAuthentication.getAuthentication(token); // access authentication header(s)
                             SecurityContextHolder.getContext().setAuthentication(user);
                              ((DefaultSimpUserRegistry) userRegistry).onApplicationEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, auth));
                             accessor.setUser(user);
                        } else {
                            accessor.setUser(auth);
                             ((DefaultSimpUserRegistry) userRegistry).onApplicationEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, auth));
                        }
                    }
                    accessor.setLeaveMutable(true);
                    return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
                }
            });
        }
    }
    

    In application context we need to register SimpUserRegistry @Bean @Primary public SimpUserRegistry userRegistry() { return new DefaultSimpUserRegistry(); }

          @Bean
          @Primary
          public UserDestinationResolver userDestinationResolver() {
            return new DefaultUserDestinationResolver(userRegistry());
          }
    

    Now We can send message to specific user

    public void handle(Exchange exchange) {
            Message camelMessage = exchange.getIn();
            com.livetalk.user.utils.Message message = camelMessage.getBody( com.livetalk.user.utils.Message.class);
            // send the message specifically to the destination user by using STOMP's user-directed messaging
            msgTemplate.convertAndSendToUser(message.getRecipient(), "/queue/messages", message, defaultHeaders);
        }