Search code examples
apache-sparkapache-kafkaapache-spark-sqlspark-structured-streamingspark-streaming-kafka

Right way to read stream from Kafka topic using checkpointLocation offsets


I'm trying to develop a small Spark app (using Scala) to read messages from Kafka (Confluent) and write them (insert) into Hive table. Everything works as expected, except for one important feature - managing offsets when the application is restarted (submited). It confuses me.

Cut from my code:

  def main(args: Array[String]): Unit = {

    val sparkSess = SparkSession
      .builder
      .appName("Kafka_to_Hive")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse/")
      .config("hive.metastore.uris", "thrift://localhost:9083")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    sparkSess.sparkContext.setLogLevel("ERROR")

    // don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
    sparkSess.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )
    

    val kafkaDataFrame = sparkSess
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", 'localhost:9092')
      .option("group.id", 'kafka-to-hive-1')
      // ------>   which Kafka options do I need to set here for starting from last right offset to ensure completenes of data and "exactly once" writing?   <--------
      .option("failOnDataLoss", (false: java.lang.Boolean))
      .option("subscribe", 'some_topic')
      .load()

    import org.apache.spark.sql.functions._
    
    // don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
    val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
    val df = valueDataFrame.select(
      from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
      .select("parsed_value.*")


    df.writeStream
      .foreachBatch((batchDataFrame, batchId) => {
        batchDataFrame.createOrReplaceTempView("`some_view_name`")
        val sqlText = "SELECT * FROM `some_view_name` a where some_field='some value'"
        val batchDataFrame_view = batchDataFrame.sparkSession.sql(sqlText);
        batchDataFrame_view.write.insertInto("default.some_hive_table")
      })
      .option("checkpointLocation", "/user/some_user/tmp/checkpointLocation")
      .start()
      .awaitTermination()
  }

Questions (the questions are related to each other):

  1. Which Kafka options do I need to apply on readStream.format("kafka") for starting from last right offset on every submit of spark app?
  2. Do I need to manually read 3rd line of checkpointLocation/offsets/latest_batch file to find last offsets to read from Kafka? I mean something like that: readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")
  3. What is the right/convenient way to read stream from Kafka (Confluent) topic? (I'm not considering offsets storing engine of Kafka)

Solution

  • "Which Kafka options do I need to apply on readStream.format("kafka") for starting from last right offset on every submit of spark app?"

    You would need to set startingOffsets=latest and clean up the checkpoint files.

    "Do I need to manually read 3rd line of checkpointLocation/offsets/latest_batch file to find last offsets to read from Kafka? I mean something like that: readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""")"

    Similar to first question, if you set the startingOffsets as the json string, you need to delete the checkpointing files. Otherwise, the spark application will always fetch the information stored in the checkpoint files and override the settings given in the startingOffsets option.

    "What is the right/convenient way to read stream from Kafka (Confluent) topic? (I'm not considering offsets storing engine of Kafka)"

    Asking about "the right way" might lead to opinion based answers and is therefore off-topic on Stackoverflow. Anyway, using Spark Structured Streaming is already a mature and production-ready approach in my experience. However, it is always worth also looking into KafkaConnect.