Let us say I have just launched a Kafka direct stream + spark streaming application. For the first batch, the Streaming Context in the driver program connects to the Kafka and fetches startOffset and endOffset. Then, it launches a spark job with these start and end offset ranges for the executors to fetch records from Kafka. My question starts here. When its time for the second batch, the Streaming context connects to the Kafka for the start and end offset ranges. How is Kafka able to give these ranges, when there is no consumer group (as Direct stream does not take into account group.id) that allows to store the last commit offset value?
There is always a Consumer Group when working with the Kafka Consumer API. It doesn't matter what kind of Stream you are dealing with (Spark Direct Streaming, Spark Structured Streaming, Java/Scala API of Kafka Consumer...).
as Direct stream does not take into account group.id
Have a look at the Spark + Kafka integration Guide for direct streaming (code example for spark-streaming-kafka010) on how to declare a consumer group:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
Even if you are not declaring a consumer group in your configuration, there will be still a (random) consumer group created for you.
Check your logs to see which group.id has been used in your application.