Search code examples
scalacloudera-cdhkafka-consumer-apikafka-producer-api

Kafka producer creates topic but is not able to send messages


I'm new to Scala and Kafka and I've run into some trouble.

I'm trying to connect a scala kafka producer to a kafka server that is installed on a cloudera express server. I have done this already once in VMs with these instructions and didn't have any problems.

When I run the producer the desired topic is created but none of the messages is sent, or so I think.

Here follows some of the code:

Kafka producer

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

class KafkaProducerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("acks", "all")
    props.put("retries", "2")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("block.on.buffer.full", "true")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("auto.create.topics.enable", "true")

    val producer = new KafkaProducer[String, String](props)

   def startCounter() {
       println("Start Producer Counter")
       for (i <- 1 to 100) {
           producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i))
           println("Producer - Send: " + i)
       }

       println("Closing producer")
       producer.close()
   }
}

When I execute the run method, I see "Producer - Send: #" as output of this and I get no errors. So I assume that this piece of code has sent the messages to Kafka.

I started the following on the kafka server before I ran the producer:

 kafka-console-consumer --zookeeper zk:2181 --topic test-counter

But here I see nothing happens.

When I check for the topic, that the producer is supposed to create, is in the list.

kafka-topics -zookeeper zk:2181 --list

I also have a similar problem with the consumer:

import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.KafkaConsumer

class KafkaConsumerManager {

    val props = new Properties()
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS)
    props.put("group.id", "testGroup")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("linger.ms", "1")
    props.put("session.timeout.ms", "3000")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS)

    val consumer = new KafkaConsumer[String, String](props)

    def start() {
        println("Start Consumer")
        consumer.subscribe(Arrays.asList("test-counter"))

        while (true) {
            val records = consumer.poll(100)
            val iterator = records.iterator()

            while (iterator.hasNext) {
                val record = iterator.next()
                printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value())
            }
        }
    }
}

When I create messages on the server via kafka-console-producer I see them appear in the kafka-console-consumer on the server, but not in the consumer I wrote.

kafka-console-producer --broker-list ks:9092 --topic test-counter

The KafkaServer.ZOOKEEPER_ADDRESS is the same as the argument zk:2181 with kafka-console-consumer and the KafkaServer.KAFKA_ADDRESS is the same as the argument ks:9092 with the kafka-console-producer.


Solution

  • I tried the code and found that:

    • one should specify key and value deserializers in consumer properties:

       props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
       props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
      
    • there is a problem with session.timeout.ms property. From here:

      heartbeat.interval.ms - ... The value must be set lower than session.timeout.ms ... default: 3000

      It means that you should either increase your session.timeout.ms value or simply remove the line because default value for the property is 30000 which is greater than default heartbeat.interval.ms.

    After performing the corrections the code works.