Search code examples
scalaparallel-processingapache-sparkcpu-cores

Spark: Different output with different number of cores


I'm dealing with a strange behaviour when I change the number of cores in my Spark Application and here is the code:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Test extends App {
Logger.getLogger("org").setLevel(Level.WARN)
var listLink: List[String] = List()
def addListLink(s: String) = {
val list = s.split(",")
for (i <- 0 to list.length - 2) {
  listLink = list(i)+ "-" + list(i + 1) :: listLink
 }
}
val conf = new SparkConf().setMaster("local[1]").setAppName("Simple Application")
val sc = new SparkContext(conf)
val paths = sc.textFile("file:///tmp/percorsi.txt")
paths.foreach(x => addListLink(x))
println("Number of items:"+listLink.size)
println(listLink)
}

My input file is something like this:

A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C
A,B,C
A,B,C
A,B,C
A,B,C
B,C,D
B,C,D
B,C,D
B,C,D
C,D
C,D

Basically for every path I call my method which adds an element to a List representing every consecutive couple of elements:

example : "A,B,C,D" => ("A-B","B-C","C-D")

As you can see, in the code there is just one core

.setMaster("local[1]")

And if I run my application (locally or on cluster) I get what I expect

println("Number of items:"+listLink.size)
//Result --> Number of Items : 38

If I change the number of cores to 3 (for example) I get different values. For example 33 items instead of 38.

Am I missing something regarding the number of cores or something else (partitions, ecc...)?

I think this is quite a simple app, but I get this strange behaviour anyway.

Could anyone help me?

Thanks in advance

FF


Solution

  • There's a separate listLink per partition. So you're adding items to several lists, and only one gets printed at the end.

    Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program.

    (from here https://spark.apache.org/docs/latest/programming-guide.html#shared-variables)

    It's your lucky day:

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import scala.collection.mutable.ArrayBuffer
    
    
    val data = List(
    "A,B,C,D",
    "A,B,C,D",
    "A,B,C,D",
    "A,B,C,D",
    "A,B,C,D",
    "A,B,C,D",
    "A,B,C",
    "A,B,C",
    "A,B,C",
    "A,B,C",
    "A,B,C",
    "B,C,D",
    "B,C,D",
    "B,C,D",
    "B,C,D",
    "C,D",
    "C,D")
    
    val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
    val sc= new SparkContext(conf)
    
    
    val dataRDD = sc.makeRDD(data, 1)
    val linkRDD = dataRDD.flatMap(_.split(",").sliding(2).map{_.mkString("", "-", "")})
    
    linkRDD.foreach(println)
    

    Output:

    A-B
    B-C
    C-D
    A-B
    B-C
    C-D
    A-B
    B-C
    C-D
    A-B
    B-C
    C-D
    A-B
    B-C
    C-D
    A-B
    B-C
    C-D
    A-B
    B-C
    A-B
    B-C
    A-B
    B-C
    A-B
    B-C
    A-B
    B-C
    B-C
    C-D
    B-C
    C-D
    B-C
    C-D
    B-C
    C-D
    C-D
    C-D