I'm trying to transfer files through my netty implementation. My custom Decoder and Encoder both know two types of Objects: String
and FileChunk, which actually contains the index of the chunk and its content as a byte[]
. The transfer works like:
String
containing the path where the file should be saved, how large it is, how many slices will be sent etc.This progress should keep on going until the file is transfered. And it works. If I add a 1 second delay before sending a 64kb slice of the file! Derp.
Seems like there is no mistake - but it does not work under heavy load and without blocking threads. Do I need to clear buffers anywhere or do I need a copy? Please help... If you are interessted in an example project with IntelliJ and Maven let me know in the comments, I will make one ready.
Explained enough. Here is code!
FileTransfer Runnable
public class FileTransfer implements Runnable {
private FileSlicer slicer;
private Client client;
public FileTransfer(FileSlicer slicer, Client client) {
this.slicer = slicer;
this.client = client;
}
public void run() {
synchronized(this) {
while(slicer.hasNext()) {
try {
client.getContext().writeAndFlush(slicer.getNextSlice());
this.wait(); //Unblocked when success packet received, works
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Channel-Initializer (Buffer size is set here, should not overflow by default)
@Override
protected void initChannel(Channel channel) throws Exception {
channel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(1024 * 65));
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new PacketDecoder());
pipeline.addLast(new PacketEncoder());
pipeline.addLast(new ChannelEncoder());
pipeline.addLast(new ServerHandler());
}
Decoder (detects if it is a String or FileChunk and parses it):
public class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> output) throws Exception {
int type = buf.readInt();
if (buf.readableBytes() <= 0) return;
byte[] buffer;
switch (type) {
case 0:
buffer = buf.readBytes(buf.readInt()).array();
output.add(new String(buffer));
break;
case 1:
int read = buf.readInt();
buffer = buf.readBytes(buf.readInt()).array();
output.add(new FileChunk(buffer, read));
break;
default:
System.out.println("Unknown Decodec.");
break;
}
}
}
Stacktrace:
io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(12) + length(65536) exceeds writerIndex(40960): PooledUnsafeDirectByteBuf(ridx: 12, widx: 40960, cap: 66560)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:347)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:331)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
...
The decode function assumes the entire file is available, however the buffer may only contain part of the data, depending on how much of the stream was received.
One way to address this is by adding a frame decoder, such as LengthFieldBasedFrameDecoder, which segments the stream according to a length field in the message, such as in your example.
Another option is to use the same approach as the File Server example in the docs: https://netty.io/4.1/xref/io/netty/example/http/file/package-summary.html