Search code examples
javanode.jswebsocketstompsockjs

Receiving only Headers after connecting Stomp over SockJS and Subscribing for messages


I am writing a client to test WebSocket connections and receive data.

After establishing the Websocket over Stomp, I am only receiving headers after subscription, I am not getting the message content/body (its not invoking handleFrame() at all)

I am seeing MessageConversionExceptions. Please look into the below snippets. I have highlighted and commented few important pieces.

Snippets (Web Socket client) :

public class StompService {
    public static final String WS_URL_SUFFIX = "/monitoring-channel";

    public StompSession createStompWSConnection(String socketURL) throws Exception  {
        List<Transport> transports = new ArrayList<Transport>();
        transports.add(new WebSocketTransport(new StandardWebSocketClient()));

        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        stompClient.setMessageConverter(new StringMessageConverter());

        //socketURL is "http://localhost:8090/monitoring-channel".. This works in the Node client
        StompSession session = stompClient.connect(socketURL, new SessionHandler()).get();
        System.out.println("Session : " + session);
        return session;
    }

    public boolean subscribeStreamData(StompSession wsSession, String watchId, String type, StompFrameHandler handler) {
        type = type.toUpperCase()
        String topic = "/topic/" + type + "/" + watchId;        
        wsSession.subscribe(topic, handler);
        return true;
    }
}

public class DefaultStompFrameHandler implements StompFrameHandler {
    private static final Logger log = LogManager.getLogger();
    BlockingQueue<String> blockingQueue;

    @Override
    public Type getPayloadType(StompHeaders stompHeaders) {
        log.debug("getPayloadType() :: stompHeaders = " + stompHeaders);  //This is printing
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders stompHeaders, Object o) {
        //THIS BLOCK IS NOT GETTING INVOKED AT ALL
        System.out.println("Object Recieved : " + new String((byte[]) o));
        blockingQueue.offer(new String((byte[]) o));
    }
}

public void main(String... args) throws Exception {
        //...

        StompSession session = stompService.createStompWSConnection("http://localhost:8090" + StompService.WS_URL_SUFFIX);
        System.out.println("Session = " + session.getSessionId());
        String watchId = "123";
        System.out.println("Watch Id : " + watchId);
        stompService.subscribeStreamData(session, watchId, "WAVEFORMS", new DefaultStompFrameHandler());
    }

public class SessionHandler implements StompSessionHandler {
    @Override
    public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
        System.out.println("Connected!"); //GETTING INVOKED
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        System.out.println("~~~~~");    
    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload,
            Throwable exception) {
        System.out.println("~~~~~~~~~~~~~~~");
        //GETTING EXCEPTION : org.springframework.messaging.converter.MessageConversionException: No suitable converter for payload type [class [B] from handler type [class com.test.kymaloadtest.service.DefaultStompFrameHandler]

        //No suitable converter for payload type [class [B] from handler type [class com.test.kymaloadtest.service.DefaultStompFrameHandler]
    }

    @Override
    public void handleTransportError(StompSession session, Throwable exception){
        System.out.println("~~~~~~~~~~~~~~~");      
    }
}

But the simulating the same client works in NodeJS. I tried my best but was not able to figure out why I am only getting headers but not the message content/data.


This is what I see in console :

[DEBUG] 2019-07-25 20:09:19.144 [WebSocketClient-AsyncIO-1] DefaultStompFrameHandler - getPayloadType() :: stompHeaders = {destination=[/topic/WAVEFORMS/d7e0b709-0150-4b93-b9d3-0c279c69c929], content-type=[application/json;charset=UTF-8], subscription=[0], message-id=[c60b1afba030419887768d39353f2b45-27320], content-length=[1794]}
[DEBUG] 2019-07-25 20:09:19.393 [WebSocketClient-AsyncIO-1] DefaultStompFrameHandler - getPayloadType() :: stompHeaders = {destination=[/topic/WAVEFORMS/d7e0b709-0150-4b93-b9d3-0c279c69c929], content-type=[application/json;charset=UTF-8], subscription=[0], message-id=[c60b1afba030419887768d39353f2b45-27321], content-length=[1748]}
[DEBUG] 2019-07-25 20:09:19.645 [WebSocketClient-AsyncIO-1] DefaultStompFrameHandler - getPayloadType() :: stompHeaders = {destination=[/topic/WAVEFORMS/d7e0b709-0150-4b93-b9d3-0c279c69c929], content-type=[application/json;charset=UTF-8], subscription=[0], message-id=[c60b1afba030419887768d39353f2b45-27322], content-length=[1723]}

Thank you.


Solution

  • The issue is resolved by making the following edits :

    [1] - By using SimpleMessageConverter : stompClient.setMessageConverter(new SimpleMessageConverter());
    [2] - Changing the return type in SessionHandler : return byte[].class;
    

    public StompSession createStompWSConnection(String socketURL, StompSessionHandler sessionHandler) throws Exception  {
            List<Transport> transports = new ArrayList<Transport>();
            transports.add(new WebSocketTransport(new StandardWebSocketClient()));
    //      transports.add(new RestTemplateXhrTransport());
    
            SockJsClient sockJsClient = new SockJsClient(transports);
            WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
            stompClient.setMessageConverter(new SimpleMessageConverter());
    
            StompSession session = stompClient.connect(socketURL,sessionHandler).get();
            System.out.println("Session : " + session);
            return session;
        }
    

    @Override
        public Type getPayloadType(StompHeaders headers) {
    //      log.info("getPayloadType() :: stompHeaders = " + headers);
            return byte[].class;
        }