Search code examples
javaapache-kafkanettykafka-producer-api

Netty request timeout


I'm trying to write an HTTP service that will take data from HTTP and put it in Kafka using Netty. I need to handle 20K RPS on a m5.large EC2 instance, which seems pretty doable.

The code is simple:

Server.java

public class Server {
    public static void main(final String[] args) throws Exception {
        final EventLoopGroup bossGroup = new EpollEventLoopGroup();
        final EventLoopGroup workerGroup = new EpollEventLoopGroup();

        try {
            final ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap
                .group(bossGroup, workerGroup)
                .channel(EpollServerSocketChannel.class)
                .childHandler(new RequestChannelInitializer(createProducer()))
                .childOption(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static Producer<String, ByteBuffer> createProducer() {
        final Properties properties = new Properties();

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaBidRequestProducer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class.getName());
        properties.put(ProducerConfig.RETRIES_CONFIG, 0);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
        properties.put(ProducerConfig.SEND_BUFFER_CONFIG, 33554432);

        return new KafkaProducer<>(properties);
    }
}

RequestChannelInitializer.java

public class RequestChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
    private final Producer<String, ByteBuffer> producer;

    public BidRequestChannelInitializer(final Producer<String, ByteBuffer> producer) {
        this.producer = producer;
    }

    @Override
    public void initChannel(final SocketChannel ch) {
        ch.pipeline().addLast(new HttpServerCodec());
        ch.pipeline().addLast(new HttpObjectAggregator(1048576));
        ch.pipeline().addLast(new RequestHandler(producer));
    }
}

RequestHandler.java

public class RequestHandler extends SimpleChannelInboundHandler<FullHttpMessage> {
    private final Producer<String, ByteBuffer> producer;

    public BidRequestHandler(final Producer<String, ByteBuffer> producer) {
        this.producer = producer;
    }

    @Override
    public void channelReadComplete(final ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpMessage msg) {
        final DefaultFullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
        final ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(
            "test",
            UUID.randomUUID().toString(),
            msg.content().nioBuffer()
        );

        producer.send(record);

        if (HttpUtil.isKeepAlive(msg)) {
            response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }

        ctx.write(response).addListener(ChannelFutureListener.CLOSE);
    }
}

The code is taken from the official documentation. However, sometimes I get Request 'Post BidRequest' failed: j.u.c.TimeoutException: Request timeout after 60000 ms exceptions in my load test.

As far as I understood, this means that the connection was established between my load test instance and service instance but it took longer then 60 seconds to complete. What part of this simple program can block for so long?

I've tuned Kafka producer: decreased it's timeout. I know that send can be blocking, so I've increased send buffer, but it didn't help. I've also increased ulimits for service user. I'm running on OpenJDK version 1.8.0_171 and securerandom.source is set to file:/dev/urandom, so the call to randomUUID should not block.


Solution

  • You're right, there's nothing in there that should block. The call to send to Kafka is asynchronous. I looked through your code, and everything looks good from what I can see.

    A couple of things I'd check:

    • Make sure that the Security Group definition in AWS allows for both the Kafka servers and the app to talk to Zookeeper. If this is a test/POC, you should just allow all traffic between all three instances/clusters. The 60 second timeout makes me suspicious of a network timeout, which might mean some service is not reachable.
    • Have you tried an even simpler test, trying to producer to Kafka without the Netty dependency? Maybe that helps down narrow the problem.