Search code examples
scalaapache-sparkrdd

How do I join two rdds based on a common field?


I am very new to Scala and learning to work with RDDs. I have two csv files which have the following headers and data: csv1.txt

id,"location", "zipcode" 
1, "a", "12345"
2, "b", "67890"
3, "c" "54321"

csv2.txt

"location_x", "location_y", trip_hrs
"a", "b", 1
"a", "c", 3
"b", "c", 2
"a", "b", 1
"c", "b", 2

Basically, csv1 data is a distinct set of locations and zip codes, whereas csv2 data has the trip duration between location_x and location_y.

The common piece of information in these two data sets is location in csv1 and location_x in csv2 even though they have different header names.

I would like to create two RDDs with one containing the data from csv1 and the other from csv2.

Then I would like to join these two RDDs and return the location, zipcode, and sum of all trip times from that location as shown below:

("a", "zipcode", 5)
("b", "zipcode", 2)
("c", "zipcode", 2)

I was wondering if one of you can assist me with this problem. Thanks.


Solution

  • I will give you the code (a complete app in IntelliJ) with some explanations. I hope it can be helpful.

    Please read the Spark documentation for the explicit details.

    working-with-key-value-pairs

    This problem can be done with Spark Dataframes, you can try for yourself.

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    
    object Joining {
    
      val spark = SparkSession
        .builder()
        .appName("Joining")
        .master("local[*]")
        .config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
        .config("spark.app.id", "Joining")  // To silence Metrics warning
        .getOrCreate()
    
      val sc = spark.sparkContext
    
      val path = "/home/cloudera/files/tests/"
    
      def main(args: Array[String]): Unit = {
    
        Logger.getRootLogger.setLevel(Level.ERROR)
    
        try {
    
          // read the files
          val file1 = sc.textFile(s"${path}join1.csv")
          val header1 = file1.first // extract the header of the file
          val file2 = sc.textFile(s"${path}join2.csv")
          val header2 = file2.first // extract the header of the file
    
          val rdd1 = file1
            .filter(line => line != header1) // to leave out the header
            .map(line => line.split(",")) // split the lines => Array[String]
            .map(arr => (arr(1).trim,arr(2).trim)) // to make up a pairRDD with arr(1) as key and zipcode
    
          val rdd2 = file2
              .filter(line => line != header2)
              .map(line => line.split(",")) // split the lines => Array[String]
              .map(arr => (arr(0).trim, arr(2).trim.toInt)) // to make up a pairRDD with arr(0) as key and trip_hrs
    
          val joined = rdd1 // join the pairRDD by its keys
              .join(rdd2)
              .cache()  // cache joined in memory
    
          joined.foreach(println) // checking data
          println("**************")
    
    //      ("c",("54321",2))
    //      ("b",("67890",2))
    //      ("a",("12345",1))
    //      ("a",("12345",3))
    //      ("a",("12345",1))
    
          val result = joined.reduceByKey({ case((zip, time), (zip1, time1) ) => (zip, time + time1) })
    
          result.map({case( (id,(zip,time)) ) => (id, zip, time)}).foreach(println) // checking output
    
    //      ("b","67890",2)
    //      ("c","54321",2)
    //      ("a","12345",5)
    
          // To have the opportunity to view the web console of Spark: http://localhost:4041/
          println("Type whatever to the console to exit......")
          scala.io.StdIn.readLine()
        } finally {
          sc.stop()
          println("SparkContext stopped")
          spark.stop()
          println("SparkSession stopped")
        }
      }
    }