Have to implement the following TCP/IP protocol implementation with Netty:
Message structure:
The messages are embedded in an STX-ETX frame:
STX MESSAGE ETX
0x02 7b20224d... 0x03
An `escaping` of STX and ETX within the message is not necessary since it is in JSON format
Escape sequence are following:
JSON.stringify ({"a": "\ x02 \ x03 \ x10"}) → "{" a \ ": " \ u0002 \ u0003 \ u0010 \ "}".
Here is more info about STX, ETX control codes.
Length of the message could be different and it will have JSON format, something like:
\0x02{"messageID": "Heartbeat"}\0x03
My idea was made a combination of custom Frame delimiter with StringEncoder/StringDecoder.
For custom Frame delimiter -> use 0x03
as a delimiter and skip the first byte (0x02).
So created the following FrameDelimiterDecoder
:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf buffFrame = null;
Object frame = super.decode(ctx, buffer);
if (frame instanceof ByteBuf) {
buffFrame = (ByteBuf) frame;
} else {
log.info("frame: {}", frame);
}
if (buffFrame != null) {
buffFrame.writeBytes(buffer.skipBytes(1));
} else {
log.warn("buffer is <null>");
}
return buffFrame;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
}
}
And use it for initialisation:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new FrameDelimiterDecoder(1024 * 1024, Unpooled.wrappedBuffer(FrameConstant.ETX)));
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
}
However, it always fails with:
c.s.netty.init.FrameDelimiterDecoder : java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(28) + length(1) exceeds writerIndex(28): PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 1024)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:471)
Could not understand what is missing there.
How to process STX-ETX frame for request/response with Netty?
After try and fails finally, solved this issue.
Rethink the code for FrameDelimiterDecoder
and found the way how to do it with an array of bytes and at the end convert to ByteBuf
. I believe it could be done with the buffer directly or to use ByteBuffer from NIO package and then convert.
The simplest for me was:
@Slf4j
public class FrameDelimiterDecoder extends DelimiterBasedFrameDecoder {
public FrameDelimiterDecoder(int maxFrameLength, ByteBuf delimiter) {
super(maxFrameLength, delimiter);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
boolean inMessage = false;
int size = buffer.readableBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(size);
buffer.readBytes(byteBuffer);
byte[] byteArray = new byte[size - 2];
byte[] data = byteBuffer.array();
int index = 0;
for (byte b : data) {
if (b == FrameConstant.START_OF_TEXT) {
if (!inMessage) {
inMessage = true;
} else {
log.warn("Unexpected STX received!");
}
} else if (b == FrameConstant.END_OF_TEXT) {
if (inMessage) {
inMessage = false;
} else {
log.warn("Unexpected ETX received!");
}
} else {
if (inMessage) {
byteArray[index] = b;
index += 1;
}
}
}
return Unpooled.wrappedBuffer(byteArray);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof InterruptedException) {
log.warn("interrupted exception occurred");
Thread.currentThread().interrupt();
} else {
log.error("FrameDelimiterEncoder exception occurred:", cause);
}
}
}
Where FrameConstant
look like:
@UtilityClass
public class FrameConstant {
public final int START_OF_TEXT = 0x02;
public final int END_OF_TEXT = 0x03;
public final int MAX_FRAME_LENGTH = 1024 * 1024;
}
Then initialize it:
@Slf4j
@Component
@RequiredArgsConstructor
public class QrReaderChannelInitializer extends ChannelInitializer<SocketChannel> {
private final StringEncoder stringEncoder = new StringEncoder();
private final StringDecoder stringDecoder = new StringDecoder();
private final QrReaderProcessingHandler readerServerHandler;
private final NettyProperties nettyProperties;
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// Add the delimiter first
pipeline.addLast(getDelimiterDecoder());
if (nettyProperties.isEnableTimeout()) {
pipeline.addLast(new ReadTimeoutHandler(nettyProperties.getClientTimeout()));
}
pipeline.addLast(stringDecoder);
pipeline.addLast(stringEncoder);
pipeline.addLast(readerServerHandler);
}
private FrameDelimiterDecoder getDelimiterDecoder() {
ByteBuf delimiter = Unpooled.wrappedBuffer(new byte[]{FrameConstant.END_OF_TEXT});
return new FrameDelimiterDecoder(FrameConstant.MAX_FRAME_LENGTH, delimiter);
}
}
And some modification for handler:
@Slf4j
@Component
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class QrReaderProcessingHandler extends ChannelInboundHandlerAdapter {
private final PermissionService permissionService;
private final EntranceService entranceService;
private final Gson gson;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String remoteAddress = ctx.channel().remoteAddress().toString();
String stringMsg = (String) msg;
if (log.isDebugEnabled()) {
log.debug("CLIENT_IP: {}", remoteAddress);
log.debug("CLIENT_REQUEST: {}", stringMsg);
}
if (HEARTBEAT.containsName(stringMsg)) {
HeartbeatResponse heartbeatResponse = buildHeartbeatResponse();
sendResponse(ctx, heartbeatResponse);
}
}
private <T> void sendResponse(ChannelHandlerContext ctx, T response) {
ctx.writeAndFlush(formatResponse(response));
}
private <T> String formatResponse(T response) {
String realResponse = String.format("%s%s%s",
(char) FrameConstant.START_OF_TEXT,
gson.toJson(response),
(char) FrameConstant.END_OF_TEXT);
log.debug("response: {}", realResponse);
return realResponse;
}
And finally, it sends correctly formed response back: