I'm new to scala and spark, need help here as below.
I have three files as address, postal_code & continent and I have read them as a RDD, now I need to find out the numbers of continent having "stratra" in address line using scala and spark.
E.g:
address postalcode continent
stratra 110011 110011 india india asia
knagar 660011 660011 usa usa usa
stratra 110012 110012 uk uk europe
manhatten 669923 669923 usa usa usa
stratra 220022 220022 srilanka srilanka asia
So the result should be:
((stratra,asia),2)
((stratra,europe),1)
or if you can give better option.
//define three case class in scala:
case class address(line:String,postalcode:Int)
case class postalcode(postalcode:Int,country:String)
case class continent(country:String,continent:String)
val address=sc.tetFile("hdfs://test/address.txt")
val postalcode=sc.tetFile("hdfs://test/postalcode.txt")
val continent=sc.tetFile("hdfs://test/comtinent.txt")
val addressRdd=address.map(x=>x.split(" ")).filetr(v=>v(0)=="stratra").map(line=>Address(line(0),line(1).toInt))
val postRdd=postalcode.map(x=>x.split("")).map(line=>postalcode(line(0).toInt,linr(1))))
val continent=continent.map(x=>x.split("")).map(line=>continent(line(0),linr(1))))
//now I try to join address and postalcode with postalcode
val addresskey=addressRdd.map(line=>(line.postalcode,line))
val postalkey=postalRdd.map(line=>(line.postalcode,line))
val joinaddpostal=addresskey.join(postalkey)
But I'm not getting desired result to process further
How to achieve this? thanks in advance
You are almost there, just putting some additional logic together.
// One common case class, so that performing rdd join would be easier
case class Result(line: String, postalCode: Int, country: String, continent: String)
val address = sc.textFile("D://texts/address.txt")
val postalCode = sc.textFile("D://texts/postalcode.txt")
val continent = sc.textFile("D://texts/continent.txt")
// Address keyed by postal code
val filteredAddress = address.filter(line => line.startsWith("stratra")).map(line => {
val splits = line.split(" ")
(splits(1).toInt, Result(splits(0), splits(1).toInt, "", ""))
})
// Postal codes keyed by postal code
val postalCodes = postalCode.map(line => {
val splits = line.split(" ")
(splits(0).toInt, Result("", splits(0).toInt, splits(1), ""))
})
// Join address and postal code ON postal code, then convert it to
// RDD keyed by country
val addressPostalCode = filteredAddress.join(postalCodes)
.map(f => (f._2._2.country, Result(f._2._1.line, f._2._1.postalCode, f._2._2.country, f._2._2.continent)))
// Continent keyed by country
val continents = continent.map(line => {
val splits = line.split(" ")
(splits(0), Result("", 0, splits(0), splits(1)))
})
// Join continents with address+postalCode rdd
val merged = continents.join(addressPostalCode)
val grouped = merged.map(f => ((f._2._2.line, f._2._1.continent), 1)).reduceByKey(_ + _).sortBy(x => x._2, false)
// Check result
grouped.take(2)
Result is:
scala> grouped.count
res39: Long = 2
scala> grouped.foreach(println(_))
((stratra,asia),2)
((stratra,europe),1)
Same can also be achieved by Dataframe API, in a shortcut example like below:
val address = Seq(("stratra", 110011), ("knagar", 660011), ("stratra", 110012), ("manhatten", 669923), ("stratra", 220022)).toDF("line", "postal")
val postals = Seq(("india", 110011), ("usa", 660011), ("usa", 669923), ("uk", 110012), ("srilanka", 220022)).toDF("country", "postal")
val continent = Seq(("india", "asia"), ("usa", "usa"), ("uk", "europe"), ("usa", "usa"), ("srilanka", "asia")).toDF("country", "continent")
// Filter address starting with "stratra", join with postal on code, join result with continent on country
val joined = address.filter(row => row.getAs[String]("line").startsWith("stratra")).join(postals, "postal").join(continent, "country")
// Result grouped by line and continent with group count
val result = joined.select("line", "continent").groupBy("line", "continent").count()
// Check result
result.show(false)
Result is:
scala> result.show
+-------+---------+-----+
| line|continent|count|
+-------+---------+-----+
|stratra| asia| 2|
|stratra| europe| 1|
+-------+---------+-----+