Search code examples
apache-kafkaapache-zookeeperkafka-producer-api

Not able to produce message in kafka Topic


I am new to Kafka I have downloaded Kafka(0.10.1) and started both Kafka and zookeeper server on my local.Below mention is the code through which I am producing messages in Kafka.

package com.pkg.kafka;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducer {

    public static void main(String args[]) throws IOException{
        Properties properties = new Properties();
        InputStream stream = new FileInputStream("/Users/apple/Documents/workspace-sts-3.7.3.RELEASE/kafka/properties/producer.properties");
        properties.load(stream);
        @SuppressWarnings("resource")
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
        for(int i = 0;i< 1;i++){
            producer.send(new ProducerRecord<String, String>("producer-testing1",Integer.toString(i),Integer.toString(i)));
        }
        producer.close();
       }
}

But as I am running the code my my zookeeper log is generating these warnings:

[2016-11-14 08:57:47,553] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,553] INFO Closed socket connection for client /127.0.0.1:63833 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,656] INFO Accepted socket connection from /127.0.0.1:63834 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:47,757] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,757] INFO Closed socket connection for client /127.0.0.1:63834 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,858] INFO Accepted socket connection from /127.0.0.1:63835 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:47,959] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:47,959] INFO Closed socket connection for client /127.0.0.1:63835 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,060] INFO Accepted socket connection from /127.0.0.1:63836 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-11-14 08:57:48,161] WARN Exception causing close of session 0x0 due to java.io.IOException: Unreasonable length = 1142898 (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,161] INFO Closed socket connection for client /127.0.0.1:63836 (no session established for client) (org.apache.zookeeper.server.NIOServerCnxn)
[2016-11-14 08:57:48,262] INFO Accepted socket connection from /127.0.0.1:63837 (org.apache.zookeeper.server.NIOServerCnxnFactory)

and the message is not getting produced.So any suggestions how to fix this?


Solution

  • KafkaProducer.send(ProducerRecord) is an asynchronous method. If you want to check whether the message is delivered successfully or failed, just invoke the same method with a callback:

    producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null)
                               e.printStackTrace();
                           System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   });
    

    If the message got failed to be published, exceptions will be thrown out.