Below is the code of my netty server. It is configured to release reference count on channelRead i.e wont be processing anything just drop the incoming data. Client is also netty based. Which starts 16 parallel connections with server and start sending data on each channel. However as soon as program starts, memory usage keep increasing and eventually it crashes with following exception.
08:41:15.789 [nioEventLoopGroup-3-1] WARN i.n.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached a
t the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 100663296 byte(s) of direct memory (used: 3602907136, max: 369885184
0)
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:640) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:594) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:764) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:754) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:260) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:231) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:397) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:118) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:285) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:265) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1079) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1072) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1062) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:263) ~[sosagent.jar:1.0-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [sosagent.jar:1.0-
SNAPSHOT]
NettyServerHandler
public class AgentServerHandler extends ChannelInboundHandlerAdapter implements RequestListener {
private Buffer buffer;
private AgentToHost endHostHandler;
private String remoteAgentIP;
private int remoteAgentPort;
private ChannelHandlerContext context;
private float totalBytes;
private long startTime;
boolean called;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
log.debug("New agent-side connection from agent {} at Port {}",
socketAddress.getHostName(),
socketAddress.getPort());
this.context = ctx;
remoteAgentIP = socketAddress.getHostName();
remoteAgentPort = socketAddress.getPort();
requestListenerInitiator.addRequestListener(this);
if (this == null ) log.info("EHy nULLL ");
// Utils.router.getContext().getAttributes().put("agent-callback", requestListenerInitiator);
StatCollector.getStatCollector().connectionAdded();
startTime = System.currentTimeMillis();
}
private boolean isMineChannel(RequestTemplateWrapper request, AgentServerHandler handler) {
// if (handler == null) log.info("nULLLL"); else log.info("not null");
return request.getPorts().contains(((InetSocketAddress) handler.context.channel().remoteAddress()).getPort());
}
/* Whenever AgentServer receives new port request from AgentClient.
This method will be called and all the open channels
will be notified. */
@Override
public void newIncomingRequest(RequestTemplateWrapper request) {
endHostHandler = getHostHandler(request);
if (isMineChannel(request, this)) {
endHostHandler.addChannel(this.context.channel());
log.debug("Channel added for Client {}:{} Agent Port {}",
request.getRequest().getClientIP(),
request.getRequest().getClientPort(),
(((InetSocketAddress) this.context.channel().remoteAddress())).getPort());
this.buffer = bufferManager.addBuffer(request, endHostHandler);
}
endHostHandler.setBuffer(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
totalBytes += ((ByteBuf) msg).capacity();
}
}
Bootstrap
private boolean startSocket(int port) {
group = new NioEventLoopGroup();
AgentTrafficShaping ats = new AgentTrafficShaping(group, 5000);
ats.setStatListener(this);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline()
.addLast("agent-traffic-shapping", ats)
.addLast("lengthdecorder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
// .addLast("bytesDecoder", new ByteArrayDecoder())
.addLast(new AgentServerHandler())
.addLast("4blength", new LengthFieldPrepender(4))
// .addLast("bytesEncoder", new ByteArrayEncoder())
;
}
}
);
ChannelFuture f = b.bind().sync();
log.info("Started agent-side server at Port {}", port);
return true;
// Need to do socket closing handling. close all the remaining open sockets
//System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
//f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Error starting agent-side server");
e.printStackTrace();
return false;
} finally {
//group.shutdownGracefully().sync();
}
}
What could be possible cause here. I know netty uses reference count to keep track of Buffers. I am just releasing the reference as soon as I get a message so that shouldn't be problem !
This happens because the client is writing faster than what the server can process. This ends up filling up the client buffer (memory) and eventual crash. The solution is to adjust the client send rate based on the server. One way to achieve this is that the server periodically reports the reading rate to the client and the client adjusts the write speed based on that.