Search code examples
apache-sparkspark-streamingspark-structured-streaming

How to check StreamingQuery performance metrics in Structured Streaming?


I want to get info like triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond from a Streaming query.

I am using rate format to generate 10 rows per second, and QueryProgressEvent to get all the metrics.

However, in console, while printing QueryProgressEvent.inputRowsPerSecond, I am getting incorrect values like : 625.0 666.66

Can someone explain why it is generating such value?

COde and sample output below:

 spark.streams.addListener(new EventMetric())

val df = spark.readStream
.format("rate")
  .option("rowsPerSecond",10)
  .option("numPartitions",1)
  .load()
  .select($"value",$"timestamp")

df.writeStream
.outputMode("append")
.option("checkpointLocation", "/testjob")
.foreachBatch((batchDf: DataFrame, batchId: Long) =>{
  println("rowcount value >>>> " + rowCountAcc.value)
  val outputDf = batchDf
  outputDf.write
    .format("console")
    .mode("append")
    .save()
})
.start()
.awaitTermination()

StreamingQueryListener:

class EventMetric extends StreamingQueryListener{
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val p = event.progress
//    println("id : " + p.id)
    println("runId : "  + p.runId)
//    println("name : " + p.name)
    println("batchid : " + p.batchId)
    println("timestamp : " + p.timestamp)
    println("triggerExecution" + p.durationMs.get("triggerExecution"))
    println(p.eventTime)
    println("inputRowsPerSecond : " + p.inputRowsPerSecond)
    println("numInputRows : " + p.numInputRows)
    println("processedRowsPerSecond : " + p.processedRowsPerSecond)
    println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {

  }
}

OUTPUT 1:

runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 164
timestamp : 2020-12-12T12:31:14.323Z
triggerExecution453
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 22.075055187637968

OUTPUT 2:

runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 168
timestamp : 2020-12-12T12:31:18.326Z
triggerExecution453
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 22.075055187637968

EDIT:

Also, if 625 is the input rate, then why is processedRowsPerSecond so low for this job which actually is doing no transformation?


UPDATE :: OUTPUT WITH PRETTY JSON:

Batch 1:

runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 198
timestamp : 2020-12-13T16:23:14.331Z
triggerExecution422
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 23.696682464454977
json : {
  "id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
  "runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
  "name" : null,
  "timestamp" : "2020-12-13T16:23:14.331Z",
  "batchId" : 198,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 666.6666666666667,
  "processedRowsPerSecond" : 23.696682464454977,
  "durationMs" : {
    "addBatch" : 47,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 0,
    "setOffsetRange" : 0,
    "triggerExecution" : 422,
    "walCommit" : 234
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
    "startOffset" : 212599,
    "endOffset" : 212600,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 666.6666666666667,
    "processedRowsPerSecond" : 23.696682464454977
  } ],
  "sink" : {
    "description" : "ForeachBatchSink"
  }
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Batch 2:

runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 191
timestamp : 2020-12-13T16:23:07.328Z
triggerExecution421
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 23.752969121140143
json : {
  "id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
  "runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
  "name" : null,
  "timestamp" : "2020-12-13T16:23:07.328Z",
  "batchId" : 191,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 625.0,
  "processedRowsPerSecond" : 23.752969121140143,
  "durationMs" : {
    "addBatch" : 62,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 16,
    "setOffsetRange" : 0,
    "triggerExecution" : 421,
    "walCommit" : 187
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
    "startOffset" : 212592,
    "endOffset" : 212593,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 625.0,
    "processedRowsPerSecond" : 23.752969121140143
  } ],
  "sink" : {
    "description" : "ForeachBatchSink"
  }
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Solution

  • Keep in mind that generating 10 rows per second does not say anything about the input rate within your overall streaming query.

    In your writeStream call you do not set a Trigger which means the streaming query gets triggered when it is done and new data is available.

    Now, the streaming query apparently does not look like it needs the whole second to read those 10 seconds but rather a fraction of it. The "inputRowsPerSecond" is more a measure on the speed of reading the input data. You could see different values in different batches also due to rounding isses. Check the field "timestamp" in your output, it is not exactly one second but usually +- a few miliseconds.

    It takes just a few milliseconds for the job to read the data and this can vary slightly from batch to batch. In batch 164 it took the job 16ms and in batch 168 it took 15ms to read in 10 messages.

    Batch 164 => 10 / 0,016sec = 625 messages per second
    
    Batch 168 => 10 / 0,015ses = 666.6667 messages per second
    
    

    The processedRowsPerSecond is calculated based on triggerExecution

    1000 / triggerExecution x 10msg = 1000 / 421 x 10msg = 23.752969