Search code examples
javaspring-bootsocketstcpnetty

Handling STX-ETX frame with Netty


Have to implement the following TCP/IP protocol implementation with Netty:

Message structure:
The messages are embedded in an STX-ETX frame:

STX    MESSAGE      ETX
0x02   7b20224d...  0x03   

An `escaping` of STX and ETX within the message is not necessary since it is in JSON format

Escape sequence are following:

JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}".

Here is more info about STX, ETX control codes.

Length of the message could be different and it will have JSON format, something like:

\0x02{"messageID": "Heartbeat"}\0x03

My idea was made a combination of custom Frame delimiter with StringEncoder/StringDecoder.

For custom Frame delimiter -> use 0x03 as a delimiter and skip the first byte (0x02).

So created the following FrameDelimiterDecoder:

@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {

    public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
        super(maxFrameLength, delimiter);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        ByteBuf buffFrame = null;

        Object frame = super.decode(ctx, buffer);
        if (frame instanceof ByteBuf) {
            buffFrame = (ByteBuf) frame;
        } else {
            log.info("frame: {}", frame);
        }

        if (buffFrame != null) {
            buffFrame.writeBytes(buffer.skipBytes(1));
        } else {
            log.warn("buffer is <null>");
        }

        return buffFrame;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause.getMessage(), cause);
    }
}

And use it for initialisation:

@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final StringEncoder stringEncoder = new StringEncoder();
    private final StringDecoder stringDecoder = new StringDecoder();

    private final QrReaderProcessingHandler readerServerHandler;
    private final NettyProperties nettyProperties;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));

        if (nettyProperties.isEnableTimeout()) {
            pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
        }
        pipeline.addLast(stringDecoder);
        pipeline.addLast(stringEncoder);
        pipeline.addLast(readerServerHandler);
    }
}

However, it always fails with:

c.s.netty.init.FrameDelimiterDecoder     : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)

Could not understand what is missing there.

How to process STX-ETX frame for request/response with Netty?


Solution

  • After try and fails finally, solved this issue.

    Rethink the code for FrameDelimiterDecoder and found the way how to do it with an array of bytes and at the end convert to ByteBuf. I believe it could be done with the buffer directly or to use ByteBuffer from NIO package and then convert.

    The simplest for me was:

    @Slf4j
    public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
        public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
            super(maxFrameLength, delimiter);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
            boolean inMessage = false;
    
            int size = buffer.readableBytes();
    
            ByteBuffer byteBuffer = ByteBuffer.allocate(size);
            buffer.readBytes(byteBuffer);
    
            byte[] byteArray = new byte[size - 2];
            byte[] data = byteBuffer.array();
    
            int index = 0;
            for (byte b : data) {
                if (b == FrameConstant.START_OF_TEXT) {
                    if (!inMessage) {
                        inMessage = true;
                    } else {
                        log.warn("Unexpected STX received!");
                    }
                } else if (b == FrameConstant.END_OF_TEXT) {
                    if (inMessage) {
                        inMessage = false;
                    } else {
                        log.warn("Unexpected ETX received!");
                    }
                } else {
                    if (inMessage) {
                        byteArray[index] = b;
                        index += 1;
                    }
                }
            }
    
            return Unpooled.wrappedBuffer(byteArray);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            if (cause instanceof InterruptedException) {
                log.warn("interrupted exception occurred");
                Thread.currentThread().interrupt();
            } else {
                log.error("FrameDelimiterEncoder exception occurred:", cause);
            }
        }
    }
    

    Where FrameConstant look like:

    @UtilityClass
    public class FrameConstant {
        public final int START_OF_TEXT = 0x02;
        public final int END_OF_TEXT = 0x03;
    
        public final int MAX_FRAME_LENGTH = 1024 * 1024;
    }
    

    Then initialize it:

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final StringEncoder stringEncoder = new StringEncoder();
        private final StringDecoder stringDecoder = new StringDecoder();
    
        private final QrReaderProcessingHandler readerServerHandler;
        private final NettyProperties nettyProperties;
    
        @Override
        protected void initChannel(SocketChannel socketChannel) {
            ChannelPipeline pipeline = socketChannel.pipeline();
    
            // Add the delimiter first
            pipeline.addLast(getDelimiterDecoder());
    
            if (nettyProperties.isEnableTimeout()) {
                pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
            }
            pipeline.addLast(stringDecoder);
            pipeline.addLast(stringEncoder);
            pipeline.addLast(readerServerHandler);
        }
    
        private FrameDelimiterDecoder getDelimiterDecoder() {
            ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
            return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
        }
    }
    

    And some modification for handler:

    @Slf4j
    @Component
    @RequiredArgsConstructor
    @ChannelHandler.Sharable
    public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
        private final PermissionService permissionService;
        private final EntranceService entranceService;
        private final Gson gson;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            String remoteAddress = ctx.channel().remoteAddress().toString();
            String stringMsg = (String) msg;
    
            if (log.isDebugEnabled()) {
                log.debug("CLIENT_IP: {}", remoteAddress);
                log.debug("CLIENT_REQUEST: {}", stringMsg);
            }
    
            if (HEARTBEAT.containsName(stringMsg)) {
                HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
                sendResponse(ctx, heartbeatResponse);
    
            } 
        }
    
        private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
            ctx.writeAndFlush(formatResponse(response));
        }
    
        private <T> String formatResponse(T response) {
            String realResponse = String.format("%s%s%s",
                    (char) FrameConstant.START_OF_TEXT,
                    gson.toJson(response),
                    (char) FrameConstant.END_OF_TEXT);
    
            log.debug("response: {}", realResponse);
            return realResponse;
        }
    

    And finally, it sends correctly formed response back:

    enter image description here