Search code examples
javawebsocketjava-websocket

Java 21 websocket message not received


Why this code does not receive any message:

public class MainClass {
    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        try (HttpClient client = HttpClient.newHttpClient()) {
            client.newWebSocketBuilder()
                    .buildAsync(URI.create("wss://socket.coinex.com/v2/futures"), new WebSocketClient(latch))
                    .join();
            latch.wait();
        }
    }

    private record WebSocketClient(CountDownLatch latch) implements WebSocket.Listener {

        @Override
        public void onOpen(WebSocket webSocket) {
            System.out.println("Connected to server");

            String message = """
                    {
                        "method": "state.subscribe",
                        "params": {"market_list": ["BTCUSDT"]},
                        "id": 1
                    }
                    """;

            webSocket.sendText(message, true);
            WebSocket.Listener.super.onOpen(webSocket);
        }

        @Override
        public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
            System.out.println("Receive: " + data.toString());
            latch.countDown();
            return WebSocket.Listener.super.onText(webSocket, data, last);
        }

        @Override
        public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
            System.out.println("Socket Closed: " + statusCode);
            latch.countDown();
            return WebSocket.Listener.super.onClose(webSocket, statusCode, reason);
        }

        @Override
        public void onError(WebSocket webSocket, Throwable error) {
            System.out.println("Error: " + error.getMessage());
            latch.countDown();
            WebSocket.Listener.super.onError(webSocket, error);
        }
    }
}     

I check connection with this command :

websocat --uncompress-gzip --binary wss://socket.coinex.com/v2/futures
# After connect enter this message to receive response : 
{"method": "state.subscribe","params": {"market_list": ["BTCUSDT"]},"id": 1}

I know messages should be decompressed on onText() method .
but my problem is this method not receive any data from server !

CoinEx Document


Solution

  • As indicated by your websocat command, the API is sending binary messages, not text messages. So, you should be overriding onBinary to process the messages instead of onText. Also as indicated by your websocat command, you'll have to decompress the data. Your listener might look something like:

    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.UncheckedIOException;
    import java.net.http.WebSocket;
    import java.nio.ByteBuffer;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.CompletionStage;
    import java.util.function.Consumer;
    import java.util.zip.GZIPInputStream;
    
    public class FuturesListener implements WebSocket.Listener {
    
      private final Consumer<String> onMessage;
    
      public FuturesListener(Consumer<String> onMessage) {
        this.onMessage = onMessage;
      }
    
      @Override
      public void onOpen(WebSocket webSocket) {
        System.out.println("onOpen");
        webSocket.sendText(
            """
              {
                "method": "state.subscribe",
                "params": {"market_list": ["BTCUSDT"]},
                "id": 1
              }
              """,
            true);
        webSocket.request(1);
      }
    
      @Override
      public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
        onMessage.accept(new String(decompress(data), StandardCharsets.UTF_8));
        webSocket.request(1);
        return null;
      }
    
      private byte[] decompress(ByteBuffer data) {
        byte[] buf = new byte[data.remaining()];
        data.get(buf);
        try {
          var stream = new GZIPInputStream(new ByteArrayInputStream(buf));
          return stream.readAllBytes();
        } catch (IOException ex) {
          throw new UncheckedIOException(ex);
        }
      }
    }
    

    Note the above omits any onError and onClose handling.