Search code examples
javaspringmultithreadingspring-websocket

Spring websocket send message from multiple threads


I'm using Spring WebSocket server implementation for one of my spring based projects. I faced an error saying The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state. I found out the problem is writing to websocket from different threads at same time.

How I temporarily fixed it: Consider I have implemented below method

void sendMessageToSession(WebsocketSession session,String message);

which sends a TextMessage to websocket session. I cant make this whole method synchronized because multiple threads can call it for different websocketSessions and messages. I also cant put session in synchronized block (tried and didn't work)

Although, I fixed my problem like this

synchronized(session.getId()){ 
    //sending message;
}

and I no longer faced that issue. But it does not seem to be good practice to use Strings in synchronized blocks. So what other solutions do I have? whats best way to send asynchronous messages?

PS: I already used ConcurrentWebSocketSessionDecorator after connection established, and I am using the updated websocket. didn't help.

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

NOTE I persist my websocet sessions in a map, where key is session.getId and value is session itself.

Unlike some other websocket implementations, Spring websocket references are not seem to be equal on each message. I saved sessions in a map by their ID, and on each message I check equality of the passed websocket with the websocket I already put on my map, its false.


Solution

  • ConcurrentWebSocketSessionDecorator works like a charm in multithreading, it is designed for it. You may have a problem in your map implementation.

    sample code :

    private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception 
    {
        // Use the following will crash :
        //sessions.put(session.getId(), new SessionData(session));
    
        // Use ConcurrentWebSocketSessionDecorator is safe :
        sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
        super.afterConnectionEstablished(session);
    }
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
    {
        sessions.remove(session.getId());
        super.afterConnectionClosed(session, status); 
    }
    
    public void send(WebSocketSession session, String msg) throws MessagingException {
        try {
            session.sendMessage (new TextMessage(msg));
        } catch (IOException ex) {
            throw new MessagingException(ex.getMessage());
        }
    }
    

    To test easily the behaviour in multithreading :

        public void sendMT(WebSocketSession session, String msg) throws MessagingException{
        for (int i=0; i<3; i++){
            new Thread(){
              @Override
              public void run(){
                  send (session, msg);
            }.start();  
        }
    }