Search code examples
scalaapache-spark

weird affectation to a variable in foreachPartition


I attempt a transactionCount variable to be 100 and I get 0. I have an RDD with always, only one partition. I have a piece of code like this which processes the RDD:

var transactionCount = -1

payment_rdd.foreachPartition { partitionOfRecords =>
  // this line affect 100 to transactionCount  since the I have 100 record 
  // in my RDD so in my partition
  transactionCount = partitionOfRecords.size
  partitionOfRecords.foreach { record =>
      // I process each record
  }
  try {
    // this line keep 100 to transactionCount 

    //another process
  }
  catch {
    case _: Throwable => {
      // I never pass here
      log.error("Cannot process correctly")
      transactionCount = 0
    }
  }
}
return transactionCount

I get in return -1 despite 100 and I can't understand why. Do you have any idea or a better solution? Thanks


Solution

  • When you execute this code:

    • Spark computes closure.
    • Serializes each variable required by closure and send it to the executors.
    • When code is executed each executor modifies local copy of the deserialized variable.

    This is described and explained in programming guide

    Also you cannot do this:

    transactionCount = partitionOfRecords.size
    

    Iterators can be traversed only once and will be empty after you compute size.

    I'd use Try and accumulator:

    val transactionCount = spark.sparkContext.longAccumulator
    
    rdd.foreach { record => {
       if Try {
         // your code goes here
       }.isSuccess transactionCount.add(1L)
    }}