Search code examples
scalaapache-sparkrdd

How to join multiple RDD having Object using scala with case class


I'm new to scala and spark, need help here as below.
I have three files as address, postal_code & continent and I have read them as a RDD, now I need to find out the numbers of continent having "stratra" in address line using scala and spark.

E.g:

address             postalcode         continent
stratra 110011     110011 india        india asia
knagar 660011      660011 usa          usa usa
stratra 110012     110012 uk           uk europe
manhatten 669923   669923 usa          usa usa
stratra 220022     220022 srilanka     srilanka asia

So the result should be:

((stratra,asia),2)
((stratra,europe),1)

or if you can give better option.

//define three case class in scala:
case class address(line:String,postalcode:Int)
case class postalcode(postalcode:Int,country:String)
case class continent(country:String,continent:String)

val address=sc.tetFile("hdfs://test/address.txt")
val postalcode=sc.tetFile("hdfs://test/postalcode.txt")
val continent=sc.tetFile("hdfs://test/comtinent.txt")

val addressRdd=address.map(x=>x.split(" ")).filetr(v=>v(0)=="stratra").map(line=>Address(line(0),line(1).toInt))

val postRdd=postalcode.map(x=>x.split("")).map(line=>postalcode(line(0).toInt,linr(1))))

val continent=continent.map(x=>x.split("")).map(line=>continent(line(0),linr(1))))

//now I try to join address and postalcode with postalcode
val addresskey=addressRdd.map(line=>(line.postalcode,line))
val postalkey=postalRdd.map(line=>(line.postalcode,line))
val joinaddpostal=addresskey.join(postalkey)

But I'm not getting desired result to process further
How to achieve this? thanks in advance


Solution

  • You are almost there, just putting some additional logic together.

        // One common case class, so that performing rdd join would be easier
        case class Result(line: String, postalCode: Int, country: String, continent: String)
    
        val address = sc.textFile("D://texts/address.txt")
        val postalCode = sc.textFile("D://texts/postalcode.txt")
        val continent = sc.textFile("D://texts/continent.txt")
    
        // Address keyed by postal code
        val filteredAddress = address.filter(line => line.startsWith("stratra")).map(line => {
          val splits = line.split(" ")
          (splits(1).toInt, Result(splits(0), splits(1).toInt, "", ""))
        })
    
        // Postal codes keyed by postal code
        val postalCodes = postalCode.map(line => {
          val splits = line.split(" ")
          (splits(0).toInt, Result("", splits(0).toInt, splits(1), ""))
        })
    
        // Join address and postal code ON postal code, then convert it to
        // RDD keyed by country
        val addressPostalCode = filteredAddress.join(postalCodes)
                          .map(f => (f._2._2.country, Result(f._2._1.line, f._2._1.postalCode, f._2._2.country, f._2._2.continent)))
    
        // Continent keyed by country
        val continents = continent.map(line => {
          val splits = line.split(" ")
          (splits(0), Result("", 0, splits(0), splits(1)))
        })
    
        // Join continents with address+postalCode rdd
        val merged = continents.join(addressPostalCode)
        val grouped = merged.map(f => ((f._2._2.line, f._2._1.continent), 1)).reduceByKey(_ + _).sortBy(x => x._2, false)
    
        // Check result
        grouped.take(2)
    

    Result is:

    scala> grouped.count
    res39: Long = 2
    
    scala> grouped.foreach(println(_))
    ((stratra,asia),2)
    ((stratra,europe),1)
    

    Same can also be achieved by Dataframe API, in a shortcut example like below:

    val address = Seq(("stratra", 110011), ("knagar", 660011), ("stratra", 110012), ("manhatten", 669923), ("stratra", 220022)).toDF("line", "postal")
    
    val postals = Seq(("india", 110011), ("usa", 660011), ("usa", 669923), ("uk", 110012), ("srilanka", 220022)).toDF("country", "postal")
    
    val continent = Seq(("india", "asia"), ("usa", "usa"), ("uk", "europe"), ("usa", "usa"), ("srilanka", "asia")).toDF("country", "continent")
    
    // Filter address starting with "stratra", join with postal on code, join result with continent on country
    val joined = address.filter(row => row.getAs[String]("line").startsWith("stratra")).join(postals, "postal").join(continent, "country")
    
    // Result grouped by line and continent with group count
    val result = joined.select("line", "continent").groupBy("line", "continent").count()
    
    // Check result
    result.show(false)
    

    Result is:

    scala> result.show
    +-------+---------+-----+
    |   line|continent|count|
    +-------+---------+-----+
    |stratra|     asia|    2|
    |stratra|   europe|    1|
    +-------+---------+-----+