Search code examples

Kafka producer hangs on send

The logic is that a streaming job, getting data from a custom source has to write both to Kafka as well as HDFS.

I wrote a (very) basic Kafka producer to do this, however the whole streaming job hangs on the send method.

class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: String, val sslCertificatePassword: String) {

  val kafkaProps: Properties = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
  kafkaProps.put("acks", "1")
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProps.put("ssl.truststore.location", sslCertificatePath)
  kafkaProps.put("ssl.truststore.password", sslCertificatePassword)

  val kafkaProducer: KafkaProducer[Long, Array[String]] = new KafkaProducer(kafkaProps)

  def sendKafkaMessage(message: Message): Unit = { => {
      val producerRecord: ProducerRecord[Long, Array[String]] = new ProducerRecord[Long, Array[String]](kafkaTopic, message.timeStamp.getTime, list.toArray)

And the code calling the producer:

receiverStream.foreachRDD(rdd => {
      val messageRowRDD: RDD[Row] = rdd.mapPartitions(partition => {
        val parser: Parser = new Parser
        val kafkaProducer: KafkaProducer = new KafkaProducer(kafkaBootstrapServers, kafkaTopic, kafkaSslCertificatePath, kafkaSslCertificatePass)
        val newPartition = => {
          Logger.getLogger("importer").error("Writing Message to Kafka...")
          Logger.getLogger("importer").error("Finished writing Message to Kafka")
 => parser.parseMessage(Message.timeStamp.getTime, singleMessage))

      val df = sqlContext.createDataFrame(messageRowRDD, Schema.messageSchema)

      Logger.getLogger("importer").info("Entries-count: " + df.count())
      val row = Try(df.first)

      row match {
        case Success(s) => Persister.writeDataframeToDisk(df, outputFolder)
        case Failure(e) => Logger.getLogger("importer").warn("Resulting DataFrame is empty. Nothing can be written")

From the logs I can tell that each executor is reaching the "sending to kafka" point, however not any further. All executors hang on that and no exception is thrown.

The Message class is a very simple case class with 2 fields, a timestamp and an array of strings.


  • This was due to the acks field in Kafka.

    Acks was set to 1 and sends went ahead a lot faster.