Search code examples
spark-structured-streaming

How to send parquet to kafka in batches using strcutured spark streaming?


I'am reading parquet files and convert it into JSON format, then send to kafka. The question is, it read the whole parquet so send to kafka one-time, but i want to send json data line by line or in batches:

object WriteParquet2Kafka {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder
      .master("yarn")
      .appName("Write Parquet to Kafka")
      .getOrCreate()

    import spark.implicits._
    val ds: DataFrame = spark.readStream
      .schema(parquet-schema)
      .parquet(path-to-parquet-file)


    val df: DataFrame = ds.select($"vin" as "key", to_json( struct( ds.columns.map(col(_)):_*  ) ) as "value" )
      .filter($"key" isNotNull)

    val ddf = df
      .writeStream
      .format("kafka")
      .option("topic", topics)
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("checkpointLocation", "/tmp/test")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    ddf.awaitTermination()
  }
}

Is it possible to do this?


Solution

  • I finally figure out how to solve my question, just add a option and set a suitable number for maxFilesPerTrigger:

        val df: DataFrame = spark
          .readStream
          .option("maxFilesPerTrigger", 1)
          .schema(parquetSchema)
          .parquet(parqurtUri)
    

    Note: maxFilesPerTrigger must set to 1, so that every parquet file being readed.