Search code examples
apache-kafkakafka-producer-api

{Kafka V 0.10.1}Producing message takes approximately 1 minute if any error occurs at async producer Apache Kafka x


Im using asynchronous producer to send messages to the broker. If there is no any error in my callback, i see it takes approximately 0.3 seconds to produce message. But when I get below error [1], I see it takes 60 seconds to produce message. But I do not see any message loss . All messages are available in the broker. What causes this error? I see this delay in every 50 message I produced. How can I increase the performance of the producer when I get this error?

Code;

producer.send(new ProducerRecord(topic, this), new ProducerCallback ());



  private class ProducerCallback implements Callback {

        @Override

        public void onCompletion(RecordMetadata recordMetadata, Exception ex) {

            if (ex != null) {

              log.error("Error when publishing messages to the topic. Topic :"+ recordMetadata.topic(),ex);

            }

        }

    }

Producer properties

acks=1

linger.ms=10

batch.size=51200

bootstrap.servers=aukk1.xx.com\:9092,aukk2.xx.com\:9092,aukk3.xx.com\:9092

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=com.xx.KafkaPayloadSerializer

[1]

04:51:20,025 ERROR [org.apache.kafka.clients.producer.internals.RecordBatch] (kafka-producer-network-thread | producer-673) Error executing user-provided callback on message for topic-partition RAW_XML1harveyzhu-1:: java.lang.NullPointerException
       at com.lxx.kafkamodels.KafkaPayload$ProducerCallback.onCompletion(KafkaPayload.java:204)
       at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
       at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
       at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
       at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:205)
       at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
       at java.lang.Thread.run(Thread.java:745)

Solution

  • If ex is not null, the first parameter 'RecordMetadata' is null, so that's why you see NPE while invoking recordMetadata.topic()