Search code examples
scalaperformanceapache-sparkjvmspark-streaming

Spark DF processing time: why processing time drop for looping the same operation?


I am trying to do a "naive measure time" (endTime - startTime) for some spark operations, to get an idea about different alternatives to do the same logic.

def measureElapsedTime[T](f: => T): (T, Long) = {

  val start = System.nanoTime()
  val result: T = f
  val end = System.nanoTime()

  (result, NANOSECONDS.toMillis(end - start))
}

I am getting a decreasing processing time series from the loop. Why does it happen?

Let's say, for example, I am measuring the following simple operation:

def t1(): (Array[Row], Long) = measureElapsedTime {
  val res = spark.read.json(nestedDF)
  res.collect()
}

Measuring it the following way:

val n = 1000
def avg(f: () => (Array[Row], Long)) = 1.to(n).map(_ => f()._2).sum / n
println(avg(t1))

Would give me approximately 45 ms on average.

But, if I take a look at the head of the series, I see clearly a decrease in processing time. For example, the measured time in ms in order of execution, for the first executions:

707
157
144
119
153
108
99
105
121
107
132
89
96
100
83
93
87
94
73

Why does this pattern happen? Is this due to JVM warm-up time? Is this due to some spark optimization?

I don't think that the reason is JVM warm-up time because it doesn't happen for operations not reusing the "same" DF (and anyway, I added some other operations before the previous example to warm up the JVM). For example, the following gives virtually stable running time:

def t2(): (Array[Row], Long) = measureElapsedTime {
  val res: Dataset[String] = nestedDF.map((str: String) => {
    val json: Value = ujson.read(str)
    ujson.write(json)
  })
  spark.read.json(res).collect()
}

Where running time series is:

44
141
93
92
79
78
79
84
76
80
78
77
71
71
70
71
103
74
69
72

Question background: I am measuring operations for a spark structured streaming app, so I am quite sure that for each micro-batch I would get the first measured value, since every micro-batch is a new DF. However I might be wrong, and this is the reason I am asking this question.

Thank you all.


Clarifications:

nestedDF in the examples above is a Dataset[String] with JSON string values.

+-----------------------------------------------------------------------------------------------+
|value                                                                                          |
+-----------------------------------------------------------------------------------------------+
|{"simple":  "a", "nested":  {"c1":  "a"}}                                                      |
|{"simple":  "a", "more-nested":  {"c1": {"c11": {"c111":  "a"}}}}                              |
|{"simple":  "a", "nested-with-diff-types": {"array": ["a", "b"], "obj": {"c1": {"c11":  "a"}}}}|
|{"simple":  "a", "nested-with-array":  {"c1": {"c11": ["a", "b", "c"]}}}                       |
|{"simple":  "a", "nested-with-array-with-obj-elem":  {"c1": {"c11": [{"a": "a"}, {"b": "b"}]}}}|
+-----------------------------------------------------------------------------------------------+

Solution

  • Your code essentially runs the following Spark code (the rest around it is just Scala) in a loop:

    val res = spark.read.json(nestedDF)
    res.collect()
    

    Spark optimizations?

    In order to understand whether there are optimizations that Spark is doing, let's use the explain method, which shows you the physical query execution plan that Spark will use. This was done in a Spark v3.3.1 spark-shell:

    val nestedDF = Seq(
      """{"simple":  "a", "nested":  {"c1":  "a"}}""",
      """{"simple":  "a", "more-nested":  {"c1": {"c11": {"c111":  "a"}}}}""",
      """{"simple":  "a", "nested-with-diff-types": {"array": ["a", "b"], "obj": {"c1": {"c11":  "a"}}}}""",
      """{"simple":  "a", "nested-with-array":  {"c1": {"c11": ["a", "b", "c"]}}}""",
      """{"simple":  "a", "nested-with-array-with-obj-elem":  {"c1": {"c11": [{"a": "a"}, {"b": "b"}]}}}"""
    ).toDF.as[String]
    
    val res = spark.read.json(nestedDF)
    
    res.explain
    == Physical Plan ==
    *(1) Scan ExistingRDD[more-nested#9,nested#10,nested-with-array#11,nested-with-array-with-obj-elem#12,nested-with-diff-types#13,simple#14]
    
    res.collect
    
    val res = spark.read.json(nestedDF)
    
    res.explain
    == Physical Plan ==
    *(1) Scan ExistingRDD[more-nested#28,nested#29,nested-with-array#30,nested-with-array-with-obj-elem#31,nested-with-diff-types#32,simple#33]
    
    res.collect
    

    Conclusion

    As you see, the physical plan is exactly the same on the Spark level so no optimizations were done over there.

    JVM optimizations?

    The way Apache Spark turns scripts into Java bytecode is beyond what I know, but there is a way we can use our knowledge of the JVM to understand whether there are some optimizations being done.

    If we're talking about code that gets executed quicker and quicker, we can more specifically look at the code cache. This is a region which stores code that the JIT compiler creates when compiling Java bytecode.

    Using the jconsole tool (which is part of the Java 8 JDK), I monitored the code cache while running your code. I slightly adapted your measureElapsedTime to the following to get the absolute timestamp of the run:

    def measureElapsedTime[T](f: => T): (T, Long, Long) = {
      val now = Instant.now().toEpochMilli()
      val start = System.nanoTime()
      val result: T = f
      val end = System.nanoTime()
      (result, TimeUnit.NANOSECONDS.toMillis(end - start), now)
    }
    

    And made a timeTaken method instead of your avg to keep the value of each single run

    def timeTaken(f: () => (Array[Row], Long, Long)) = 1.to(n).map(_ => (f()._2, f()._3))
    

    Plotting the code cache utilization and the time it took to calculate runs shows the following graph:

    enter image description here

    A few remarks:

    • I used a log scale for the y axis of the first graph because else the slight speedup after the first part is less visible
    • The code cache graph starts before the time taken graph. This is because I started up my spark-shell before I actually started running the code

    Conclusion

    The code clearly speeds up, as you had already noticed. This speed up is strongly correlated with the extra usage of code cache. This also makes sense, as the JIT compiler does more and more work to compile your repeated code, each run will be executed more quickly.