Search code examples
scalaapache-sparksetseq

How to fill Scala Seq of Sets with unique values from Spark RDD?


I'm working with Spark and Scala. I have an RDD of Array[String] that I'm going to iterate through. The RDD contains values for attributes like (name, age, work, ...). I'm using a Sequence of mutable Sets of Strings (called attributes) to collect all unique values for each attribute.

Think of the RDD as something like this:

("name1","21","JobA")
("name2","21","JobB")
("name3","22","JobA")

In the end I want something like this:

attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB"))

I have the following code:

val someLength = 10
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set())
val splitLines = rdd.map(line => line.split("\t"))

lines.foreach(line => {
  for {(value, index) <- line.zipWithIndex} {
    attributes(index).add(value)
    // #1
  }
})

// #2

When I debug and stop at the line marked with #1, everything is fine, attributes is correctly filled with unique values.

But after the loop, at line #2, attributes is empty again. Looking into it shows, that attributes is a sequence of sets, that are all of size 0.

Seq()
Seq()
...

What am I doing wrong? Is there some kind of scoping going on, that I'm not aware of?


Solution

  • The answer lies in the fact that Spark is a distributed engine. I will give you a rough idea of the problem that you are facing. Here the elements in each RDD are bucketed into Partitions and each Partition potentially lives on a different node.

    When you write rdd1.foreach(f) that f is wrapped inside a closure (Which gets copies of the corresponding objects). Now, this closure is serialized and then sent to each node where it is applied for each element in that Partition.

    Here, your f will get a copy of attributes in its wrapped closure and hence when f is executed, it interacts with that copy of attributes and not with attributes that you want. This results in your attributes being left out without any changes.

    I hope the problem is clear now.

    val yourRdd = sc.parallelize(List(
        ("name1","21","JobA"),
        ("name2","21","JobB"),
        ("name3","22","JobA")
    ))
    
    val yourNeededRdd = yourRdd
      .flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) })
      .groupBy({ case (attrName, attrVal) => attrName })
      .map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct })
    
    // RDD(
    //     ("name", List("name1", "name2", "name3")),
    //     ("age", List("21", "22")),
    //     ("work", List("JobA", "JobB"))
    // )
    
    // Or
    
    val distinctNamesRdd = yourRdd.map(_._1).distinct
    // RDD("name1", "name2", "name3")
    
    val distinctAgesRdd = yourRdd.map(_._2).distinct
    // RDD("21", "22")
    
    val distinctWorksRdd = yourRdd.map(_._3).distinct
    // RDD("JobA", "JobB")