I am currently working in messaging system, where resource server is stateless with oAuth 2. Now, i have to send a message to single user with a queue but problem is that spring messaging needed a session in other to send a messaging as described in https://stackoverflow.com/a/31577152/3076403.
The problem with me is how to get currently login user in stateless restful service:
@MessageMapping("/messaging")
public void messaging( Message<Object> message) {
Principal user=
message.getHeaders()
.get(SimpMessageHeaderAccessor.USER_HEADER,Principal.class);
messageTemplate.convertAndSend("/topic/users", user.getName());
}
Spring will use the queue when we use simpMessagingTemplate.convertAndSendToUser(...) method and pass the username associated with session id. Otherwise it will use a topic, where all subscribed clients will eventually read the same message returned from the server.
As I have no session in resource server and need queue to send message to individual user.Any comments and ideas appreciated
Finally after all i get a solution. By decoding json web token for username and providing authentication to username solve above problems. JwtAuthentication is custom class which is responsible for decoding JWT and providing authentication to username of JWT
@Configuration
@EnableWebSocketMessageBroker
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Autowired
private SimpUserRegistry userRegistry;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic","/queue");
// use the /app prefix for others
config.setApplicationDestinationPrefixes("/app");
}
@Autowired
private JwtAuthentication jwtAuthentication;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// use the /messaging endpoint (prefixed with /app as configured above) for incoming requests
registry.addEndpoint("/messaging").setAllowedOrigins("http://localhost:8080").withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
List<String> tokenList = accessor.getNativeHeader("Authorization");
String token = null;
if(tokenList != null && tokenList.size() > 0) {
token = tokenList.get(0).replaceAll("Bearer", "").trim();
}
if (StompCommand.CONNECT.equals(accessor.getCommand()) || StompCommand.SUBSCRIBE.equals(accessor.getCommand()) || StompCommand.SEND.equals(accessor.getCommand()) ) {
Authentication auth = SecurityContextHolder.getContext().getAuthentication();
if(auth==null){
Authentication user = jwtAuthentication.getAuthentication(token); // access authentication header(s)
SecurityContextHolder.getContext().setAuthentication(user);
((DefaultSimpUserRegistry) userRegistry).onApplicationEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, auth));
accessor.setUser(user);
} else {
accessor.setUser(auth);
((DefaultSimpUserRegistry) userRegistry).onApplicationEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, auth));
}
}
accessor.setLeaveMutable(true);
return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
}
});
}
}
In application context we need to register SimpUserRegistry @Bean @Primary public SimpUserRegistry userRegistry() { return new DefaultSimpUserRegistry(); }
@Bean
@Primary
public UserDestinationResolver userDestinationResolver() {
return new DefaultUserDestinationResolver(userRegistry());
}
Now We can send message to specific user
public void handle(Exchange exchange) {
Message camelMessage = exchange.getIn();
com.livetalk.user.utils.Message message = camelMessage.getBody( com.livetalk.user.utils.Message.class);
// send the message specifically to the destination user by using STOMP's user-directed messaging
msgTemplate.convertAndSendToUser(message.getRecipient(), "/queue/messages", message, defaultHeaders);
}