Search code examples
javanettyfile-transfer

Netty file transfer causes exception


I'm trying to transfer files through my netty implementation. My custom Decoder and Encoder both know two types of Objects: String and FileChunk, which actually contains the index of the chunk and its content as a byte[]. The transfer works like:

  1. Client0 sends Client1 a JsonObject, as String containing the path where the file should be saved, how large it is, how many slices will be sent etc.
  2. Client0 sends the first slice as a FileChunk and blocks the thread to wait for an answer that Client1 successfully received its packet.
  3. Client1 received the slice of the File and writes it to the disk. Afterward, it sends the success packet to Client0
  4. Client0 receives the success packet and sends the next slice.

This progress should keep on going until the file is transfered. And it works. If I add a 1 second delay before sending a 64kb slice of the file! Derp.

Seems like there is no mistake - but it does not work under heavy load and without blocking threads. Do I need to clear buffers anywhere or do I need a copy? Please help... If you are interessted in an example project with IntelliJ and Maven let me know in the comments, I will make one ready.

Explained enough. Here is code!

FileTransfer Runnable

public class FileTransfer implements Runnable {

    private FileSlicer slicer;
    private Client client;

    public FileTransfer(FileSlicer slicer, Client client) {
        this.slicer = slicer;
        this.client = client;
    }

    public void run() {

        synchronized(this) {

            while(slicer.hasNext()) {

                try {
                    client.getContext().writeAndFlush(slicer.getNextSlice());
                    this.wait(); //Unblocked when success packet received, works
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }
}

Channel-Initializer (Buffer size is set here, should not overflow by default)

@Override
protected void initChannel(Channel channel) throws Exception {

    channel.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(1024 * 65));

    ChannelPipeline pipeline = channel.pipeline();
    pipeline.addLast(new PacketDecoder());
    pipeline.addLast(new PacketEncoder());
    pipeline.addLast(new ChannelEncoder());
    pipeline.addLast(new ServerHandler());

}

Decoder (detects if it is a String or FileChunk and parses it):

public class PacketDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> output) throws Exception {

        int type = buf.readInt();
        if (buf.readableBytes() <= 0) return;
        byte[] buffer;

        switch (type) {

            case 0:
                buffer = buf.readBytes(buf.readInt()).array();
                output.add(new String(buffer));
                break;

            case 1:
                int read = buf.readInt();
                buffer = buf.readBytes(buf.readInt()).array();
                output.add(new FileChunk(buffer, read));
                break;

            default:
                System.out.println("Unknown Decodec.");
                break;

        }

    }

}

Stacktrace:

io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(12) + length(65536) exceeds writerIndex(40960): PooledUnsafeDirectByteBuf(ridx: 12, widx: 40960, cap: 66560)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:347)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:230)
    at io.netty.channel.ChannelHandlerInvokerUtil.invokeChannelReadNow(ChannelHandlerInvokerUtil.java:84)
    at io.netty.channel.DefaultChannelHandlerInvoker.invokeChannelRead(DefaultChannelHandlerInvoker.java:153)
    at io.netty.channel.PausableChannelEventExecutor.invokeChannelRead(PausableChannelEventExecutor.java:86)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:389)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:956)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:331)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at io.netty.util.internal.chmv8.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1412)
    at io.netty.util.internal.chmv8.ForkJoinTask.doExec(ForkJoinTask.java:280)
    at io.netty.util.internal.chmv8.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:877)
    ...

Solution

  • The decode function assumes the entire file is available, however the buffer may only contain part of the data, depending on how much of the stream was received.

    One way to address this is by adding a frame decoder, such as LengthFieldBasedFrameDecoder, which segments the stream according to a length field in the message, such as in your example.

    Another option is to use the same approach as the File Server example in the docs: https://netty.io/4.1/xref/io/netty/example/http/file/package-summary.html