Search code examples
scalaapache-sparkjoinrddbroadcast

Scala word conversion operation between 2 rdds


There is a data format with two columns, and then each column is separated by '\t'.The first column is the number and the second column is the meaning.The data for each column is String

111  A
112  B
113  C
114  D
115  E
116  F
117  G
118  H
...

The other data also has two columns and the data for each column is String. The format is this.

111  112:0.75,114:0.43,117:0.21
112  113:0.67,114:0.48,115:0.34,116:0.12
113  114:0.33,118:0.12
...

Then I need to translate the number of the second data into its specific meaning. The result is as follows:

 A  B:0.75,D:0.43,G:0.21
 B  C:0.67,D:0.48,E:0.34,F:0.12
 ...

PS:These data formats are String!

How do I code? Then if the first data rdd1 has a small amount of data, the second data rdd2 has a large amount of data. Running on the Spark cluster, is it necessary to use broadcast, what is the specific method? Thank you for your answers


Solution

  • Here is one way of doing it. Assuming your both the RDDs are of type RDD[(String, String)] and your first RDD is smaller in size.

    //create your first RDD
    val rdd1: RDD[(String, String)] = sc.parallelize(Seq(
      ("111", "A"),
      ("112", "B"),
      ("113", "C"),
      ("114", "D"),
      ("115", "E"),
      ("116", "F"),
      ("117", "G")))
    
    //as this rdd is small so collect it and convert it to a map
    val mapRdd1: Map[String, String] = rdd1.collect.toMap
    //broadcast this map to all executors
    val bRdd = sc.broadcast(mapRdd1)
    
    //create your second rdd
    val rdd2: RDD[(String, String)] = sc.parallelize(Seq(
      ("111", "112:0.75,114:0.43,117:0.21"),
      ("112", "113:0.67,114:0.48,115:0.34,116:0.12")))
    
    val result: RDD[(String, String)] = rdd2.map(x => (x._1, //keep first string as it is
      x._2.split(",").map(a => a.split(":")) //split second string for the required transformations
        //fetch the value from the bradcasted map
        .map(t => (bRdd.value(t.head), t.last)).mkString(" ")))
    
    result.foreach(println(_))
    
    //output
    //(111,(B,0.75) (D,0.43) (G,0.21))
    //(112,(C,0.67) (D,0.48) (E,0.34) (F,0.12))
    

    This assumes that all the meanings for rdd2 are present in your first RDD. If not, then while fetching values from the map use bRdd.value.getOrElse(t.head,"DEFAULT_VALUE").