I am writing a client to test WebSocket connections and receive data.
After establishing the Websocket over Stomp, I am only receiving headers after subscription, I am not getting the message content/body (its not invoking handleFrame() at all)
I am seeing MessageConversionExceptions. Please look into the below snippets. I have highlighted and commented few important pieces.
Snippets (Web Socket client) :
public class StompService {
public static final String WS_URL_SUFFIX = "/monitoring-channel";
public StompSession createStompWSConnection(String socketURL) throws Exception {
List<Transport> transports = new ArrayList<Transport>();
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new StringMessageConverter());
//socketURL is "http://localhost:8090/monitoring-channel".. This works in the Node client
StompSession session = stompClient.connect(socketURL, new SessionHandler()).get();
System.out.println("Session : " + session);
return session;
}
public boolean subscribeStreamData(StompSession wsSession, String watchId, String type, StompFrameHandler handler) {
type = type.toUpperCase()
String topic = "/topic/" + type + "/" + watchId;
wsSession.subscribe(topic, handler);
return true;
}
}
public class DefaultStompFrameHandler implements StompFrameHandler {
private static final Logger log = LogManager.getLogger();
BlockingQueue<String> blockingQueue;
@Override
public Type getPayloadType(StompHeaders stompHeaders) {
log.debug("getPayloadType() :: stompHeaders = " + stompHeaders); //This is printing
return String.class;
}
@Override
public void handleFrame(StompHeaders stompHeaders, Object o) {
//THIS BLOCK IS NOT GETTING INVOKED AT ALL
System.out.println("Object Recieved : " + new String((byte[]) o));
blockingQueue.offer(new String((byte[]) o));
}
}
public void main(String... args) throws Exception {
//...
StompSession session = stompService.createStompWSConnection("http://localhost:8090" + StompService.WS_URL_SUFFIX);
System.out.println("Session = " + session.getSessionId());
String watchId = "123";
System.out.println("Watch Id : " + watchId);
stompService.subscribeStreamData(session, watchId, "WAVEFORMS", new DefaultStompFrameHandler());
}
public class SessionHandler implements StompSessionHandler {
@Override
public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
System.out.println("Connected!"); //GETTING INVOKED
}
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
System.out.println("~~~~~");
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload,
Throwable exception) {
System.out.println("~~~~~~~~~~~~~~~");
//GETTING EXCEPTION : org.springframework.messaging.converter.MessageConversionException: No suitable converter for payload type [class [B] from handler type [class com.test.kymaloadtest.service.DefaultStompFrameHandler]
//No suitable converter for payload type [class [B] from handler type [class com.test.kymaloadtest.service.DefaultStompFrameHandler]
}
@Override
public void handleTransportError(StompSession session, Throwable exception){
System.out.println("~~~~~~~~~~~~~~~");
}
}
But the simulating the same client works in NodeJS. I tried my best but was not able to figure out why I am only getting headers but not the message content/data.
This is what I see in console :
[DEBUG] 2019-07-25 20:09:19.144 [WebSocketClient-AsyncIO-1] DefaultStompFrameHandler - getPayloadType() :: stompHeaders = {destination=[/topic/WAVEFORMS/d7e0b709-0150-4b93-b9d3-0c279c69c929], content-type=[application/json;charset=UTF-8], subscription=[0], message-id=[c60b1afba030419887768d39353f2b45-27320], content-length=[1794]}
[DEBUG] 2019-07-25 20:09:19.393 [WebSocketClient-AsyncIO-1] DefaultStompFrameHandler - getPayloadType() :: stompHeaders = {destination=[/topic/WAVEFORMS/d7e0b709-0150-4b93-b9d3-0c279c69c929], content-type=[application/json;charset=UTF-8], subscription=[0], message-id=[c60b1afba030419887768d39353f2b45-27321], content-length=[1748]}
[DEBUG] 2019-07-25 20:09:19.645 [WebSocketClient-AsyncIO-1] DefaultStompFrameHandler - getPayloadType() :: stompHeaders = {destination=[/topic/WAVEFORMS/d7e0b709-0150-4b93-b9d3-0c279c69c929], content-type=[application/json;charset=UTF-8], subscription=[0], message-id=[c60b1afba030419887768d39353f2b45-27322], content-length=[1723]}
Thank you.
The issue is resolved by making the following edits :
[1] - By using SimpleMessageConverter : stompClient.setMessageConverter(new SimpleMessageConverter());
[2] - Changing the return type in SessionHandler : return byte[].class;
public StompSession createStompWSConnection(String socketURL, StompSessionHandler sessionHandler) throws Exception {
List<Transport> transports = new ArrayList<Transport>();
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
// transports.add(new RestTemplateXhrTransport());
SockJsClient sockJsClient = new SockJsClient(transports);
WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
stompClient.setMessageConverter(new SimpleMessageConverter());
StompSession session = stompClient.connect(socketURL,sessionHandler).get();
System.out.println("Session : " + session);
return session;
}
@Override
public Type getPayloadType(StompHeaders headers) {
// log.info("getPayloadType() :: stompHeaders = " + headers);
return byte[].class;
}