I am new to Kafka I have downloaded Kafka(0.10.1) and started both Kafka and zookeeper server on my local.Below mention is the code through which I am producing messages in Kafka.
package com.pkg.kafka;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducer {
public static void main(String args[]) throws IOException{
Properties properties = new Properties();
InputStream stream = new FileInputStream("/Users/apple/Documents/workspace-sts-3.7.3.RELEASE/kafka/properties/producer.properties");
properties.load(stream);
@SuppressWarnings("resource")
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
for(int i = 0;i< 1;i++){
producer.send(new ProducerRecord<String, String>("producer-testing1",Integer.toString(i),Integer.toString(i)));
}
producer.close();
}
}
But as I am running the code my my zookeeper log is generating these warnings:
[2016-11-14 08:57:47,553] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,553] INFO Closed socket connection for client /127.0.0.1:63833 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,656] INFO Accepted socket connection from /127.0.0.1:63834 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:47,757] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,757] INFO Closed socket connection for client /127.0.0.1:63834 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,858] INFO Accepted socket connection from /127.0.0.1:63835 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:47,959] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,959] INFO Closed socket connection for client /127.0.0.1:63835 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,060] INFO Accepted socket connection from /127.0.0.1:63836 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:48,161] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,161] INFO Closed socket connection for client /127.0.0.1:63836 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,262] INFO Accepted socket connection from /127.0.0.1:63837 (org.apache.zookeeper.server.NIOServerCnxnFactory)
and the message is not getting produced.So any suggestions how to fix this?
KafkaProducer.send(ProducerRecord) is an asynchronous method. If you want to check whether the message is delivered successfully or failed, just invoke the same method with a callback:
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
If the message got failed to be published, exceptions will be thrown out.