Search code examples
scalaapache-sparkrddlazy-evaluationdistributed-computing

How to time Spark program execution speed


I want to time my Spark program execution speed but due to laziness it's quite difficult. Let's take into account this (meaningless) code here:

var graph = GraphLoader.edgeListFile(context, args(0))
val graph_degs = graph.outerJoinVertices(graph.degrees).triplets.cache

/* I'd need to start the timer here */
val t1 = System.currentTimeMillis  
val edges = graph_degs.flatMap(trip =>  { /* do something*/ })
                      .union(graph_degs)

val count = edges.count
val t2 = System.currentTimeMillis 
/* I'd need to stop the timer here */

println("It took " + t2-t1 + " to count " + count)

The thing is, transformations are lazily so nothing gets evaluated before val count = edges.count line. But according to my point of view t1 gets a value despite the code above hasn't a value... the code above t1 gets evaluated after the timer started despite the position in the code. That's a problem...

In Spark Web UI I can't find anything interesting about it since I need the time spent after that specific line of code. Do you think is there a easy solution to see when a group of transformation gets evaluated for real?


Solution

  • Since consecutive transformations (within the same task - meaning, they are not separated by shuffles and performed as part of the same action) are performed as a single "step", Spark does not measure them individually. And from Driver code - you can't either.

    What you can do is measure the duration of applying your function to each record, and use an Accumulator to sum it all up, e.g.:

    // create accumulator
    val durationAccumulator = sc.longAccumulator("flatMapDuration")
    
    // "wrap" your "doSomething" operation with time measurement, and add to accumulator
    val edges = rdd.flatMap(trip => {
      val t1 = System.currentTimeMillis
      val result = doSomething(trip)
      val t2 = System.currentTimeMillis
      durationAccumulator.add(t2 - t1)
      result
    })
    
    // perform the action that would trigger evaluation
    val count = edges.count
    
    // now you can read the accumulated value
    println("It took " + durationAccumulator.value + " to flatMap " + count)
    

    You can repeat this for any individual transformation.

    Disclaimers:

    • Of course, this will not include the time Spark spent shuffling things around and doing the actual counting - for that, indeed, the Spark UI is your best resource.
    • Note that accumulators are sensitive to things like retries - a retried task will update the accumulator twice.

    Style Note: You can make this code more reusable by creating a measure function that "wraps" around any function and updates a given accumulator:

    // write this once:
    def measure[T, R](action: T => R, acc: LongAccumulator): T => R = input => { 
      val t1 = System.currentTimeMillis
      val result = action(input)
      val t2 = System.currentTimeMillis
      acc.add(t2 - t1)
      result
    }
    
    // use it with any transformation:
    rdd.flatMap(measure(doSomething, durationAccumulator))