Search code examples
multithreadingscalarace-condition

Why am I getting a race condition in multi-threading scala?


I am trying to parallelise a p-norm calculation over an array.

To achieve that I try the following, I understand I can solve this differently but I am interested in understanding where the race condition is occurring,

val toSum = Array(0,1,2,3,4,5,6)

// Calculate the sum over a segment of an array
def sumSegment(a: Array[Int], p:Double, s: Int, t: Int): Int = {
  val res = {for (i <- s until t) yield scala.math.pow(a(i), p)}.reduceLeft(_ + _)
  res.toInt
}

// Calculate the p-norm over an Array a
def parallelpNorm(a: Array[Int], p: Double): Double = {
  var acc = 0L

  // The worker who should calculate the sum over a slice of an array
  class sumSegmenter(s: Int, t: Int) extends Thread {
    override def run() {
      // Calculate the sum over the slice
      val subsum = sumSegment(a, p, s, t)
      // Add the sum of the slice to the accumulator in a synchronized fashion
      val x = new AnyRef{}
      x.synchronized {
        acc = acc + subsum
      }
    }
  }

  val split = a.size  / 2
  val seg_one = new sumSegmenter(0, split)
  val seg_two = new sumSegmenter(split, a.size)
  seg_one.start
  seg_two.start
  seg_one.join
  seg_two.join
  scala.math.pow(acc, 1.0 / p)
}
println(parallelpNorm(toSum, 2))

Expected output is 9.5393920142 but instead some runs give me 9.273618495495704 or even 2.23606797749979.

Any recommendations where the race condition could happen?


Solution

  • The problem has been explained in the previous answer, but a better way to avoid this race condition and improve performance is to use an AtomicInteger

    // Calculate the p-norm over an Array a
    def parallelpNorm(a: Array[Int], p: Double): Double = {
      val acc = new AtomicInteger(0)
    
      // The worker who should calculate the sum over a slice of an array
      class sumSegmenter(s: Int, t: Int) extends Thread {
        override def run() {
          // Calculate the sum over the slice
          val subsum = sumSegment(a, p, s, t)
          // Add the sum of the slice to the accumulator in a synchronized fashion
          acc.getAndAdd(subsum)
        }
      }
    
      val split = a.length / 2
      val seg_one = new sumSegmenter(0, split)
      val seg_two = new sumSegmenter(split, a.length)
      seg_one.start()
      seg_two.start()
      seg_one.join()
      seg_two.join()
      scala.math.pow(acc.get, 1.0 / p)
    }
    

    Modern processors can do atomic operations without blocking which can be much faster than explicit synchronisation. In my tests this runs twice as fast as the original code (with correct placement of x)