Search code examples
javatcpvert.xtcpserver

Vertx NetServer control read flow


I am trying to mimic a TCP server for tests with Vertx based on existing infrastructure that I have to work with.

The server I am mimicking works completely async and knows the length of the incoming buffer based on a pre-header in the buffer that indicates the length of the request.

I need to read the first 6 characters of the incoming request on each client socket that connect to my mock TCP server. from this pre-header I read the actual length of the request (e.g. for xx3018, i know the full length of the request is 3018).

Then I need to read the rest of the buffer according to the length, match it to a map of responses and return the right response for the request.

Example for a working mock server with plain java (fast implementation so other development won't be blocked :) )

public void run(String... args) throws Exception {
    log.info("Starting TCP Server");

    ServerSocket serverSocket = new ServerSocket(1750);

    while (true) {
        try {
            Socket socket = serverSocket.accept();

            CompletableFuture.runAsync(() -> {
                Exception e = null;
                while (e == null) {
                    try {
                        InputStream inputStream = socket.getInputStream();
                        OutputStream outputStream = socket.getOutputStream();

                        byte[] preHeader = new byte[6];
                        inputStream.read(preHeader);

                        String preHeaderValue = new String(preHeader);
                        log.info("Pre header: {}", preHeaderValue);

                        int length = Integer.valueOf(preHeaderValue.substring(2));
                        log.info("Request full length: {}", length);
                        byte[] request = new byte[length - 6];

                        inputStream.read(request);

                        String requestValue = new String(request);

                        log.info("Request: {}", requestValue);

                        String response = this.requestResponseProvider.getResponse(preHeaderValue + requestValue);
                        log.info("Response: {}", response);
                        outputStream.write(response.getBytes());
                    } catch (Exception ex) {
                        log.error("Encountered a problem: {}", e.getMessage());
                        e = ex;
                    }
                }
            });
        } catch (Exception e) {
            log.error("Encountered a problem: {}", e.getMessage());
        }
    }
}

I can't seem to find a way to control the input stream the same way I control it with plain java.


Solution

  • After a very long time of leaving this issue aside, I decided to play with it a bit.

    I remembered using the following module for a different project: https://github.com/vert-x3/vertx-tcp-eventbus-bridge

    I also remembered that in the tcp bridge's internal protocol, it appends the length of the payload to the buffer that is being sent via the tcp bridge, I looked into the source code to find out how it handles chunks (aka frames)

    I found the following: https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java which does exactly what I wanted to achieve :)

    I modified it a bit, converted to Kotlin, and made it so I can control the header size and the way it extracts the payload length.

    The following is a rough quick and dirty example of controlling the read flow with Vert.x NetServer:

    suspend fun main() {
      val vertx = Vertx.vertx()
      initServer(vertx)
      initClient(vertx)
    }
    
    suspend fun initServer(vertx: Vertx) {
      val server = vertx.createNetServer(netServerOptionsOf(port = 8888, host = "localhost"))
    
      server
        .connectHandler { socket ->
          val parser = FrameParser(
            headerSize = 4,
            headerHandler = {
              it.getInt(0)
            },
            handler = {
              println(it.toString())
              println("---")
            }
          )
          socket.handler(parser)
    
          socket.exceptionHandler {
            it.printStackTrace()
            socket.close()
          }
        }
        .listenAwait()
    }
    
    suspend fun initClient(vertx: Vertx) {
      val client = vertx.createNetClient()
      val socket = client.connectAwait(port = 8888, host = "localhost")
    
      val message = "START|${"foobarfoobar".repeat(100)}|END"
      val length = message.length
      repeat(5) {
        repeat(100) {
          vertx.setPeriodic(10) {
            socket.write(
              Buffer.buffer()
                .appendInt(length)
                .appendString(message)
            )
          }
        }
        delay(1000)
      }
    }
    
    /**
     * Based on: https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
     */
    class FrameParser(
      private val headerSize: Int,
      private val headerHandler: (Buffer) -> Int,
      private val handler: (Buffer) -> Unit
    ) : Handler<Buffer?> {
    
      private var _buffer: Buffer? = null
      private var _offset = 0
    
      override fun handle(buffer: Buffer?) {
        append(buffer)
        var offset: Int
        while (true) {
          // set a rewind point. if a failure occurs,
          // wait for the next handle()/append() and try again
          offset = _offset
    
          // how many bytes are in the buffer
          val remainingBytes = bytesRemaining()
    
          // at least expected header size
          if (remainingBytes < headerSize) {
            break
          }
    
          // what is the length of the message
          val length: Int = headerHandler(_buffer!!.getBuffer(_offset, _offset + headerSize))
          _offset += headerSize
          if (remainingBytes - headerSize >= length) {
            // we have a complete message
            handler(_buffer!!.getBuffer(_offset, _offset + length))
            _offset += length
          } else {
            // not enough data: rewind, and wait
            // for the next packet to appear
            _offset = offset
            break
          }
        }
      }
    
      private fun append(newBuffer: Buffer?) {
        if (newBuffer == null) {
          return
        }
    
        // first run
        if (_buffer == null) {
          _buffer = newBuffer
          return
        }
    
        // out of data
        if (_offset >= _buffer!!.length()) {
          _buffer = newBuffer
          _offset = 0
          return
        }
    
        // very large packet
        if (_offset > 0) {
          _buffer = _buffer!!.getBuffer(_offset, _buffer!!.length())
        }
        _buffer!!.appendBuffer(newBuffer)
        _offset = 0
      }
    
      private fun bytesRemaining(): Int {
        return if (_buffer!!.length() - _offset < 0) {
          0
        } else {
          _buffer!!.length() - _offset
        }
      }
    }