Search code examples
apache-kafkakafka-consumer-apikafka-producer-api

My producer can create a topic, but data doesn't seem to be stored inside the broker


My producer can create a topic, but it doesn't seem to store any data inside a broker. I can check that the topic is created with kafka-topics script.

When I tried to consume with kafka-console-consumer, it doesn't consume anything. (I know --from-beginning.)

When I produced with kafka-console-producer, my consumer(kafka-console-consumer) can consume it right away. So there is something wrong with my java code.

And when I run my code with localhost:9092, it worked fine. And when I consume the topic with my consumer code, it was working properly. My producer works with Kafka server on my local machine but doesn't work with another Kafka server on remote machine.

Code :

//this code is inside the main method
Properties properties = new Properties();
        //properties.put("bootstrap.servers", "localhost:9092");  
        //When I used localhost, my consumer code consumes it fine. 
        properties.put("bootstrap.servers", "192.168.0.30:9092");        
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
        //topc is created, but consumer can't consume any data. 
        //I tried putting different values for key and value parameters but no avail.

        try {
            kafkaProducer.send(record);
            System.out.println("complete");
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.close();
            System.out.println("closed");
        }
           
        /*//try{
        for(int i = 0; i < 10000; i++){
            System.out.println(i);
            kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), "message - " + i ));
        }*/

My CLI (Putty) :

enter image description here

I want to see my consumer consuming when I run my java code. (Those data shown in the image are from the producer script.)


update

After reading answers and comments, this is what I've tried so far. Still not consuming any messages. I think message produced in this code is not stored in the broker. I tried with the different server, too. The same problem. Topic was created, but no consumer exists in the consumer group list and can't consume. And no data can be consumed with consumer script.

I also tried permission change. (chown) and tried with etc/hosts files. but no luck. I'll keep on trying until I solve this.

public static void main(String[] args){
        Properties properties = new Properties();
        //properties.put("bootstrap.servers", "localhost:9092");
        properties.put("bootstrap.servers", "192.168.0.30:9092");        
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("linger.ms", "1");
        properties.put("batch.size", "16384");
        properties.put("request.timeout.ms", "30000");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
        System.out.println("1");
        try {
            kafkaProducer.send(record);
            //kafkaProducer.send(record).get();
            // implement Callback 
            System.out.println("complete");
            kafkaProducer.flush();
            System.out.println("flush completed");
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.flush();
            System.out.println("another flush test");
            kafkaProducer.close();
            System.out.println("closed");
        }       
}   

When I run this in Eclipse, the console shows :

enter image description here

enter image description here


Solution

  • I finally figured out. If you experienced similar problem, there are things you can do.

    In your server.properties, uncomment these and put the ip and port. (There seems to be a problem with the port, so I changed it.)

    listeners=PLAINTEXT://192.168.0.30:9093
    advertised.listeners=PLAINTEXT://192.168.0.30:9093
    

    (Before restarting your broker with your changed server.properties, you might want to clean all existing log.dir. Try this, if nothing works)

    Some other things you might want to consider :

    • change your log.dir. Usually the default path is tmp, but sometimes there is a noexec setting, so configure to a different location
    • check your etc/hosts
    • check your permission : And use chown and chmod
    • change zookeeper port and kafka port if necessary.
    • change broker.id

    My working producer code :

    public class Producer1 {
    
        public static void main(String[] args){
            Properties properties = new Properties();            
            properties.put("bootstrap.servers", "192.168.0.30:9093");        
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    
            ProducerRecord<String, String> record = new ProducerRecord<>("test", "1","jin");
    
            try {           
                kafkaProducer.send(record);
                System.out.println("complete");                     
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaProducer.close();
                System.out.println("closed");
            }              
        }   
    }
    
    
    

    working Consumer code:

    public class Consumer1 {
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.30:9093");
            props.put("group.id", "jin");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    
            consumer.subscribe(Collections.singletonList("test"));
    
            try {
                while (true) {              
    
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    
                    for (ConsumerRecord<String, String> record : records){
                        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());          
                    }                           
                }
            } catch (Exception e){
                e.printStackTrace();
            } finally {
                consumer.close();
                System.out.println("closed");
            }   
        }
    }
    

    Console : enter image description here