Search code examples
scalaapache-sparkhadoopspark-structured-streaming

Spark Structured Streaming joins csv file stream and rate stream too much time per batch


I have rate and csv file streams joining on rat values and csv file id:

def readFromCSVFile(path: String)(implicit spark: SparkSession): DataFrame =  {
    val schema = StructType(
        StructField("id", LongType, nullable = false) ::
        StructField("value1", LongType, nullable = false) ::
        StructField("another", DoubleType, nullable = false) :: Nil)
  val spark: SparkSession = SparkSession
  .builder
  .master("local[1]")
  .config(new SparkConf().setIfMissing("spark.master", "local[1]")
  .set("spark.eventLog.dir", "file:///tmp/spark-events")
  ).getOrCreate()

   spark
      .readStream
      .format("csv")
      .option("header", value=true)
      .schema(schema)
      .option("delimiter", ",")
      .option("maxFilesPerTrigger", 1)
      //.option("inferSchema", value = true)
      .load(path)
  }

   val rate = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .option("numPartitions", 10)
      .load()
      .withWatermark("timestamp", "1 seconds")

    val cvsStream=readFromCSVFile(tmpPath.toString)
    val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == counter.value").withWatermark("timestamp", "1 seconds")

    cvsStream2
      .writeStream
      .trigger(Trigger.ProcessingTime(10))
      .format("console")
      .option("truncate", "false")
      .queryName("kafkaDataGenerator")
      .start().awaitTermination(300000)

CSV file is 6 lines long, but proccessing one batch takes at about 100 s:

2021-10-15 23:21:29 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 10 milliseconds, but spent 92217 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+---+------+-------+-----------------------+-----+
|id |value1|another|timestamp              |value|
+---+------+-------+-----------------------+-----+
|6  |2     |3.0    |2021-10-15 20:20:02.507|6    |
|5  |2     |2.0    |2021-10-15 20:20:01.507|5    |
|1  |1     |1.0    |2021-10-15 20:19:57.507|1    |
|3  |1     |3.0    |2021-10-15 20:19:59.507|3    |
|2  |1     |2.0    |2021-10-15 20:19:58.507|2    |
|4  |2     |1.0    |2021-10-15 20:20:00.507|4    |
+---+------+-------+-----------------------+-----+

How I can optimize the join operation to process this batch faster? It shouldn't take so many calculation, so it looks like there is a kind of hidden watermarking or what else, making batch to wait for about 100 s. What kind of options/properties can be applied?


Solution

  • I would suggest that you don't have enough data to look into performance yet. Why don't you crank the data up to 500,000 and see if you have an issue? Right now I'm concerned that you aren't running enough data to exercise the performance of your system effectively and the startup costs aren't being appropriately amortized by the volume of data.