Search code examples
scalaapache-sparkziprdddatabricks

How to zip two RDDs (Scala)


I have two RDDs which have this form:

RDD A: (Columns: icchID, X_Coord)

[ICCH 1,10.0]
[ICCH 2,10.0]
[ICCH 4,100.0]
[ICCH 4,100.0]
[ICCH 2,13.0]

RDD B: (Columns: Y_Coord, Class)

[10.0,A]
[64.0,B]
[39.0,A]
[9.0,C]
[80.0,D]

I would like to combine those two RDDs so that I have an RDD which has this form:

[ICCH 1,10.0,10.0,A]
[ICCH 2,10.0,64.0,B]
[ICCH 4,100.0,39.0,A]
[ICCH 4,100.0,9.0,C]
[ICCH 2,13.0,80.0,D]

Note that both RDDs have the same Rows and Columns. Is it possible to do this using Scala?

P.S. I am a noob at Scala. I am using Databricks.


Solution

  • You may consider using the RDD zip method along with a transformation via map:

    val rddA = sc.parallelize(Seq(
      ("ICCH 1", 10.0), ("ICCH 2", 10.0), ("ICCH 4", 100.0), ("ICCH 5", 100.0), ("ICCH 2", 13.0)
    ))
    
    val rddB = sc.parallelize(Seq(
      (10.0, "A"), (64.0, "B"), (39.0, "A"), (9.0, "C"), (80.0, "D")
    ))
    
    val zippedRDD = (rddA zip rddB).map{ case ((id, x), (y, c)) => (id, x, y, c) }
    // zippedRDD: org.apache.spark.rdd.RDD[(String, Double, Double, String)] = ...
    
    zippedRDD.collect
    // Array[(String, Double, Double, String)] = Array(
    //   (ICCH 1,10.0,10.0,A), (ICCH 2,10.0,64.0,B), (ICCH 4,100.0,39.0,A), (ICCH 5,100.0,9.0,C), (ICCH 2,13.0,80.0,D)
    // )
    

    Be cautioned though maintaining the ordering between two RDDs is a tricky matter. Here's a relevant SO link about the ordering issue.

    And below is what the Spark API doc re: RDD zip method says :

    def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
    

    Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).