Search code examples
javasocketsspring-integration

Stx/Etx serializer in spring integration


I'm working on an application that uses a socket to communicate with the server.

I use spring integration on the client-side and java ServerSocket on the server-side. And I use STX/ETX for serializing the message on the socket for indicating the start and the end of a message for this purpose I use the spring ByteArrayStxEtxSerializer class.

But the problem is when ByteArrayStxEtxSerializer used as serialization, messages received with some delay on the server-side or vise-versa, and sometimes messages never received.

The only thing makes me confuse is when I checked the same scenario with ByteArrayCrLfSerializer, whole the process goes well without any delay or failure.

My code snip for client-side and server-side comes at following :

Server-side:

public void startSocketServer(){

  try (final ServerSocket serverSocket = new ServerSocket(9992)) {
       gl.info("Server is listening on: " + serverSocket.getLocalSocketAddress());

     while (true) {
         final Socket socket = serverSocket.accept();
         gl.info("A new client connected");
         new SocketThread(socket).start();
     }
  } catch (IOException e) {
    e.printStackTrace();
  }
}

private class SocketThread extends Thread {
  private final Socket socket;
  private final PrintWriter writer;
  private final BufferedReader reader;

  public SocketThread(Socket socket) throws IOException {
     this.socket = socket;
     InputStream input = socket.getInputStream();
     OutputStream output = socket.getOutputStream();

     reader = new BufferedReader(new InputStreamReader(input));
     writer = new PrintWriter(output, true);
  }
  public void run() {
     try {
         while (true) {
             String inputMessage = reader.readLine();
             if (inputMessage != null) {
                 MessageType messageType = getTypeInstance(inputMessage);

                if (messageType instanceof LoginMessage loginMessage) {
                     if (isAuthenticated(loginMessage.getUsername(), loginMessage.getPassword())) {
                        gl.info("#### SERVER => User authorized");
                        final String messageBody = createConnectionAckMessage();
                        print(writer, messageBody);
                     } else {
                         print(writer, createRefusalMessage());
                     }
                } else if (messageType instanceof StartTransferingData startData) {
                    getMessages().forEach(message-> print(writer, message));

                } else if (messageType instanceof DisconnectionAck disAck) {
                    print(writer, "By then")
                    break;
                }
            }
         }
         socket.close();
     } catch (IOException ex) {
         gl.info("Server exception: " + ex.getMessage());
     }
 }
 private void print(PrintWriter writer, String msg) {
     writer.print(ByteArrayStxEtxSerializer.STX);
     writer.print(msg);
     writer.print(ByteArrayStxEtxSerializer.ETX);
 } 
}

And the client-side:

public class CapConfig {

   @MessagingGateway(defaultRequestChannel = "toTcp", errorChannel = "errorChannel")
   public interface TcpGateway {
     @Gateway
     void send(String in);
   }

   @Bean
   public MessageChannel toTcp() {
     return new DirectChannel();
   }

   @Bean
   public AbstractClientConnectionFactory clientCF() {
      return Tcp.netClient("localhost", 9992)
          .serializer(TcpCodecs.stxetx())
          .deserializer(TcpCodecs.stxetx())
          .get();
   }

   @Bean
   public IntegrationFlow tcpOutFlow(AbstractClientConnectionFactory connectionFactory) {
      return IntegrationFlows.from(toTcp())
         .handle(Tcp.outboundAdapter(connectionFactory))
         .get();
   }

   @Bean
   public IntegrationFlow tcpInFlow(AbstractClientConnectionFactory connectionFactory) {
      return IntegrationFlows.from(Tcp.inboundAdapter(connectionFactory))
         .transform(stringTransformer)
         .log()
         //---- Do some other stuffs
         .get();
   } 
}

Solution

  • The problem was reader.readLine() because reader search for \n to identify end of line.

    I used STX/ETX for indicating the start and the end of the message, then I should parse it by myself.

            private String read() throws IOException {
               int bite = reader.read();
               if (bite!= ByteArrayStxEtxSerializer.STX){
                  throw new RuntimeException();
               }
               char[] result = new result[socket.getReceiveBufferSize()]
               while((bite=reader.read())!= ByteArrayStxEtxSerializer.ETX){
                     //-- Collect the bytes and do related stuff
                     result.....
               }
    
               return new String(result);
            }