I'm working with Spark and Scala. I have an RDD of Array[String]
that I'm going to iterate through. The RDD contains values for attributes like (name, age, work, ...)
. I'm using a Sequence of mutable Sets of Strings (called attributes
) to collect all unique values for each attribute.
Think of the RDD as something like this:
("name1","21","JobA")
("name2","21","JobB")
("name3","22","JobA")
In the end I want something like this:
attributes = (("name1","name2","name3"),("21","22"),("JobA","JobB"))
I have the following code:
val someLength = 10
val attributes = Seq.fill[mutable.Set[String]](someLength)(mutable.Set())
val splitLines = rdd.map(line => line.split("\t"))
lines.foreach(line => {
for {(value, index) <- line.zipWithIndex} {
attributes(index).add(value)
// #1
}
})
// #2
When I debug and stop at the line marked with #1, everything is fine, attributes
is correctly filled with unique values.
But after the loop, at line #2, attributes is empty again. Looking into it shows, that attributes is a sequence of sets, that are all of size 0.
Seq()
Seq()
...
What am I doing wrong? Is there some kind of scoping going on, that I'm not aware of?
The answer lies in the fact that Spark is a distributed engine. I will give you a rough idea of the problem that you are facing. Here the elements in each RDD
are bucketed into Partitions
and each Partition
potentially lives on a different node.
When you write rdd1.foreach(f)
that f
is wrapped inside a closure (Which gets copies of the corresponding objects). Now, this closure is serialized and then sent to each node where it is applied for each element in that Partition
.
Here, your f
will get a copy
of attributes
in its wrapped closure and hence when f
is executed, it interacts with that copy of attributes
and not with attributes
that you want. This results in your attributes
being left out without any changes.
I hope the problem is clear now.
val yourRdd = sc.parallelize(List(
("name1","21","JobA"),
("name2","21","JobB"),
("name3","22","JobA")
))
val yourNeededRdd = yourRdd
.flatMap({ case (name, age, work) => List(("name", name), ("age", age), ("work", work)) })
.groupBy({ case (attrName, attrVal) => attrName })
.map({ case (attrName, group) => (attrName, group.toList.map(_._2).distinct })
// RDD(
// ("name", List("name1", "name2", "name3")),
// ("age", List("21", "22")),
// ("work", List("JobA", "JobB"))
// )
// Or
val distinctNamesRdd = yourRdd.map(_._1).distinct
// RDD("name1", "name2", "name3")
val distinctAgesRdd = yourRdd.map(_._2).distinct
// RDD("21", "22")
val distinctWorksRdd = yourRdd.map(_._3).distinct
// RDD("JobA", "JobB")