There is a data format with two columns, and then each column is separated by '\t'.The first column is the number and the second column is the meaning.The data for each column is String
111 A
112 B
113 C
114 D
115 E
116 F
117 G
118 H
...
The other data also has two columns and the data for each column is String. The format is this.
111 112:0.75,114:0.43,117:0.21
112 113:0.67,114:0.48,115:0.34,116:0.12
113 114:0.33,118:0.12
...
Then I need to translate the number of the second data into its specific meaning. The result is as follows:
A B:0.75,D:0.43,G:0.21
B C:0.67,D:0.48,E:0.34,F:0.12
...
PS:These data formats are String!
How do I code? Then if the first data rdd1 has a small amount of data, the second data rdd2 has a large amount of data. Running on the Spark cluster, is it necessary to use broadcast, what is the specific method? Thank you for your answers
Here is one way of doing it. Assuming your both the RDDs
are of type RDD[(String, String)]
and your first RDD
is smaller in size.
//create your first RDD
val rdd1: RDD[(String, String)] = sc.parallelize(Seq(
("111", "A"),
("112", "B"),
("113", "C"),
("114", "D"),
("115", "E"),
("116", "F"),
("117", "G")))
//as this rdd is small so collect it and convert it to a map
val mapRdd1: Map[String, String] = rdd1.collect.toMap
//broadcast this map to all executors
val bRdd = sc.broadcast(mapRdd1)
//create your second rdd
val rdd2: RDD[(String, String)] = sc.parallelize(Seq(
("111", "112:0.75,114:0.43,117:0.21"),
("112", "113:0.67,114:0.48,115:0.34,116:0.12")))
val result: RDD[(String, String)] = rdd2.map(x => (x._1, //keep first string as it is
x._2.split(",").map(a => a.split(":")) //split second string for the required transformations
//fetch the value from the bradcasted map
.map(t => (bRdd.value(t.head), t.last)).mkString(" ")))
result.foreach(println(_))
//output
//(111,(B,0.75) (D,0.43) (G,0.21))
//(112,(C,0.67) (D,0.48) (E,0.34) (F,0.12))
This assumes that all the meanings for rdd2
are present in your first RDD
. If not, then while fetching values from the map use bRdd.value.getOrElse(t.head,"DEFAULT_VALUE")
.