Search code examples
scalaapache-sparkjoinrddchangetype

Spark scala: change numeric values in key-value pair to long/integer to join two maps


Hi I'm new to spark using scala. I have two different files, which I already created two maps as I need as follows:

data 1
1 : 2
2 : 1,3,4
3 : 2,4
4 : 2, 3

map1 will calculate the each key occurrence after the ":" The output for map1 is:

(1, 1)
(2, 3)
(3, 2)
(4, 2)

data 2:

apple
banana
kiwi
orange
strawberry

map2 will tell the element position and its output is:

(1, apple)
(2, banana)
(3, kiwi)
(4, orange)
(5, strawberry)

what I need is to join the two maps with output as follows:

(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)

I can only use org.apache.spark.SparkConf and org.apache.spark.SparkCotext. Here are the code I use so far:

    val sc = new SparkContext (conf)
    val data1 = sc.textFile("input.txt")
    val map1 = data1.map(x => x.split(":")(0), x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((y,_)}
.filter(_._2.nonEmpty).sortByKey().countByKey()
    val data2 = sc.textFile("input2.txt")
    val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
.map(pair => pair.swap)

    val merge_map = map2.join(map1)

I want to join two maps that I have made but it throwing me an error like this:

type mismatch; 
found: scala.collection.Map[String, Long] 
required: org.apache.spark.rdd.RDD[(Long,?)]

I was thinking maybe I need to change the type of the values in map1/map2. Any ideas how to do that? Thank you!

FOLLOW UP QUESTIONS:

Now I need to create map3 with the same data, which calculated the occurrence of each values on the right hand side after ":". And again join in with map 2. Here is the output of map3 and the join result I need of map3 and map2.

output map3:

(1,1)
(2,3)
(3,2)
(4,2)

join map2&map3:

(1, apple, 1)
(2, banana, 3)
(3, kiwi, 2)
(4, orange, 2)
(5, strawberry, 0)

Here is the code that I use:

val map3 = data1.map(x => x.split(":")(0).toLong, x.split(":")(1))).flatMap{case (y,z) => z.split("\\s+").map((_,1)}.reduceByKey(_+_)

val merge_map23 = map2.leftOuterJoin(map3)

I got an error :

type mismatch; 
    found: org.apache.spark.rdd.RDD[String, Long] 
    required: org.apache.spark.rdd.RDD[(Long,?)]

I already fixed the previous code with the answers below, but now I got this error. Thank you


Solution

  • Don't use countByKey. Use reduceByKey:

    val map1 = data1.map(x => x.split(":")(0), x.split(":")(1)))
      .flatMap{case (y,z) => z.split("\\s+").map((y,_)}    
      .filter(_._2.nonEmpty).mapValues(_ => 1).reduceByKey(_ + _)
    

    Then don't use collectAsMap:

    val map2 = data2.zipWithIndex().map{ case(v, index) => (v,index + 1)}
      .map(pair => pair.swap)
    

    Finally join

    map1.join(map2)