Search code examples
scalaapache-sparkkeyrdd

How to create Key-Value RDD (Scala)


I have the following RDD (name: AllTrainingDATA_RDD) which is of type org.apache.spark.rdd.RDD[(String, Double, Double, String)] :

(ICCH_1,4.3,3.0,Iris-setosa)
(ICCH_1,4.4,2.9,Iris-setosa)
(ICCH_1,4.4,3.0,Iris-setosa)
(ICCH_2,4.4,3.2,Iris-setosa)

1st column : ICCH_ID, 2nd column: X_Coordinates, 3rd Column: Y_Coordinates, 4th column: Class

I would like to end up with an RDD which has 2nd and 3rd column as the Key and 4th column as Value. The column ICCH_ID should remain in the RDD.

My currently attempt based on some Internet research is this:

val AllTrainingDATA_RDD_Final = AllTrainingDATA_RDD.map(_.split(",")).keyBy(_(X_COORD,Y_COORD)).mapValues(fields => ("CLASS")).groupByKey().collect()

However I get this error:

error: value split is not a member of (String, Double, Double, String)

P.S. I am using Databricks Community Edition. I am new to Scala.


Solution

  • Let's try to break down your solution, part by part:

    val AllTrainingDATA_RDD_Final = AllTrainingDATA_RDD
        .map(_.split(","))
        .keyBy(_(X_COORD,Y_COORD))
        .mapValues(fields => ("CLASS"))
        .groupByKey()
        .collect()
    

    You first problem is the use of .map(_.split(",")). This is likely a preprocessing stage done on an RDD[String] to extract the comma separated values from the text input lines. But since you've already done that, we can go ahead and drop the part.

    Your second problem will come from .keyBy(_(X_COORD,Y_COORD)), and it's going to look something like this:

    error: (String, Double, Double, String) does not take parameters

    This is because you supplied keyBy an anonymous function that attempts to apply (X_COORD,Y_COORD) on each of the tuples in your RDD, but what you actually want is function that extracts the x and y coordinates (2nd and 3rd values) from your tuple. One way to achieve this is .keyBy{case (_, x, y, _) => (x, y)}

    Lastly, your use of mapValues just produces the same String value ("CLASS") for all elements in the RDD. Instead, you can simply take the 4th value in the tuple like so: .mapValues(_._4)

    Putting this all together, you get the following code:

    val AllTrainingDATA_RDD_Final = AllTrainingDATA_RDD
        .keyBy{case (_, x, y, _) => (x, y)}
        .mapValues(_._4)
        .groupByKey()
        .collect()
    

    Since you are new to Scala, I suggest you take some time to get aquatinted with syntax, features and APIs before you continue. It will help you understand and overcome such problems much faster.