I'm trying to write an HTTP service that will take data from HTTP and put it in Kafka using Netty. I need to handle 20K RPS on a m5.large EC2 instance, which seems pretty doable.
The code is simple:
Server.java
public class Server {
public static void main(final String[] args) throws Exception {
final EventLoopGroup bossGroup = new EpollEventLoopGroup();
final EventLoopGroup workerGroup = new EpollEventLoopGroup();
try {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.childHandler(new RequestChannelInitializer(createProducer()))
.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static Producer<String, ByteBuffer> createProducer() {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaBidRequestProducer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class.getName());
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 33554432);
return new KafkaProducer<>(properties);
}
}
RequestChannelInitializer.java
public class RequestChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
private final Producer<String, ByteBuffer> producer;
public BidRequestChannelInitializer(final Producer<String, ByteBuffer> producer) {
this.producer = producer;
}
@Override
public void initChannel(final SocketChannel ch) {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(new RequestHandler(producer));
}
}
RequestHandler.java
public class RequestHandler extends SimpleChannelInboundHandler<FullHttpMessage> {
private final Producer<String, ByteBuffer> producer;
public BidRequestHandler(final Producer<String, ByteBuffer> producer) {
this.producer = producer;
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) {
final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(
"test",
UUID.randomUUID().toString(),
msg.content().nioBuffer()
);
producer.send(record);
if (HttpUtil.isKeepAlive(msg)) {
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
}
}
The code is taken from the official documentation. However, sometimes I get Request 'Post BidRequest' failed: j.u.c.TimeoutException: Request timeout after 60000 ms
exceptions in my load test.
As far as I understood, this means that the connection was established between my load test instance and service instance but it took longer then 60 seconds to complete. What part of this simple program can block for so long?
I've tuned Kafka producer: decreased it's timeout. I know that send
can be blocking, so I've increased send buffer, but it didn't help.
I've also increased ulimits
for service user.
I'm running on OpenJDK version 1.8.0_171 and securerandom.source
is set to file:/dev/urandom
, so the call to randomUUID
should not block.
You're right, there's nothing in there that should block. The call to send to Kafka is asynchronous. I looked through your code, and everything looks good from what I can see.
A couple of things I'd check: