Search code examples
apache-sparkapache-kafkaspark-structured-streaming

Max offsets in each Micro Batch


I have a streaming executing in the default trigger. My goal is to limit the volume read in each of these executions to avoid a huge micro batch. Sometimes my Spark Jobs stop the whole weekend, so when I restart them, they take so long to finish the first one. I also persist the Dataframes 'cause this is written in 2 databases. Two approaches were tested.

Official docs says that maxOffsetsPerTrigger limit the number of offsets processed per trigger interval, but that did not work for me. Did I misunderstood the meaning of this param?

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", "1")
  .load()

Also, I read this answer, but I do not know where and how to set max.poll.records correctly. I tried in the option of the readStream with no success. Code below:

  val read = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServers)
  .option("subscribe", kafkaTopic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("max.poll.records", "1")
  .load()

Main function:

override def execute(spark: SparkSession, args: Array[String]): Unit = {
    val basePath: String = args(0)
    val kafkaServers: String = args(1)
    val kafkaTopic: String = args(2)
    val checkpoint: String = args(3)

    val read = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaServers)
      .option("subscribe", kafkaTopic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("maxOffsetsPerTrigger", "1")
      .load()

    val transformed = read
      .transform(applySchema)
      .transform(conversions)
      .transform(dropDuplicates)
      .transform(partitioning)

    val sink = new FileSystemSink(basePath)

    val query = transformed
      .writeStream
      .outputMode(OutputMode.Append)
      .foreachBatch(sink.writeOnS3 _)
      .option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
      .start()

    query.awaitTermination()
  }

Besides the questions above, what is the correct way to limit offsets?

Spark Version: 2.4.5.


Solution

  • I tested again and maxOffsetsPerTrigger worked just fine. I misunderstood the result of the trigger and now it makes sense. The param means the total offsets read, not the offsets per partition.