Search code examples
apache-sparkapache-kafkaspark-streaming

Spark Streaming not executing the lines of code within foreach


Quick question on Spark Streaming.

I am initialising createDirectStream from KafkaUtils as stream and saving it as InputDStream in spark-streaming, as below.

val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      context,
      PreferConsistent,
      Subscribe[String, String](topicList, KafkaParameters)
    )

  stream
      .foreachRDD { rdd =>
        println("Executing within rdd")
        val rddSize = rdd.count()
        if (rddSize > 0) { println(s"Received data $rddSize") }
        else {println("Not received any data")}
      }

  context.start()

I am able to see the output for the produced data for almost 50 mins. After 50 mins, I can see below messages in logs,

Seeking to LATEST offset of partition topic_name-partition_number
Resetting offset for partition topic_name-partition_number to offset 12908.

but no more logs saying "Executing within rdd" or "Received data $rddSize" or "Not received any data"

The whole logic is working fine when I am starting the consumer but after sometime it just stops working. Any idea on what's going on here?


Solution

  • You've not shown KafkaParameters, but any Kafka consumer defaults to set auto.offset.reset=latest, meaning the offset where there is nothing to consume. And as your logs say, the consumer is resetting and seeking again to the end of the topic. You need an actively running producer to see any data with that config

    Also, Spark Streaming (this is not Kafka Steams) is deprecated and all features of Spark Structured Streaming work similarly, at least for Kafka.