Hi I'm new to spark using scala. I have two different files, which I already created two maps as I need as follows:
data 1
1 : 2
2 : 1,3,4
3 : 2,4
4 : 2, 3
map1 will calculate the each key occurrence after the ":" The output for map1 is:
(1, 1)
(2, 3)
(3, 2)
(4, 2)
data 2:
apple
banana
kiwi
orange
strawberry
map2 will tell the element position and its output is:
(1, apple)
(2, banana)
(3, kiwi)
(4, orange)
(5, strawberry)
what I need is to join the two maps with output as follows:
(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)
I can only use org.apache.spark.SparkConf and org.apache.spark.SparkCotext. Here are the code I use so far:
val sc = new SparkContext (conf)
val data1 = sc.textFile("input.txt")
val map1 = data1.map(x => x.split(":")(0), x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((y,_)}
.filter(_._2.nonEmpty).sortByKey().countByKey()
val data2 = sc.textFile("input2.txt")
val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
.map(pair => pair.swap)
val merge_map = map2.join(map1)
I want to join two maps that I have made but it throwing me an error like this:
type mismatch;
found: scala.collection.Map[String, Long]
required: org.apache.spark.rdd.RDD[(Long,?)]
I was thinking maybe I need to change the type of the values in map1/map2. Any ideas how to do that? Thank you!
FOLLOW UP QUESTIONS:
Now I need to create map3 with the same data, which calculated the occurrence of each values on the right hand side after ":". And again join in with map 2. Here is the output of map3 and the join result I need of map3 and map2.
output map3:
(1,1)
(2,3)
(3,2)
(4,2)
join map2&map3:
(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)
Here is the code that I use:
val map3 = data1.map(x => x.split(":")(0).toLong, x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((_,1)}.reduceByKey(_+_)
val merge_map23 = map2.leftOuterJoin(map3)
I got an error :
type mismatch;
found: org.apache.spark.rdd.RDD[String, Long]
required: org.apache.spark.rdd.RDD[(Long,?)]
I already fixed the previous code with the answers below, but now I got this error. Thank you
Don't use countByKey
. Use reduceByKey
:
val map1 = data1.map(x => x.split(":")(0), x.split(":")(1)))
.flatMap{case (y,z) => z.split("\\s+").map((y,_)}
.filter(_._2.nonEmpty).mapValues(_ => 1).reduceByKey(_ + _)
Then don't use collectAsMap
:
val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
.map(pair => pair.swap)
Finally join
map1.join(map2)