I have two dataframes.
countryDF
+-------+-------------------+--------+---------+
| id | CountryName |Latitude|Longitude|
+-------+-------------------+--------+---------+
| 1 | United States | 39.76 | -98.5 |
| 2 | China | 35 | 105 |
| 3 | India | 20 | 77 |
| 4 | Brazil | -10 | -55 |
...
+-------+-------------------+--------+---------+
salesDF
+-------+-------------------+--------+---------+--------+
| id | Country |Latitude|Longitude|revenue |
+-------+-------------------+--------+---------+--------+
| 1 | Japan | | | 11 |
| 2 | China | | | 12 |
| 3 | Brazil | | | 56 |
| 4 | Scotland | | | 12 |
...
+-------+-------------------+--------+---------+--------+
The task is to generate latitude and longitude for salesDF. That will search each cell of salesDF column "Country" from countryDF column "CountryName". If found a row, append corresponding "Latitude" and "Longitude" to it.
The output dataframe is:
+-------+-------------------+--------+---------+---------+
| id | CountryName |Latitude|Longitude|revenue |
+-------+-------------------+--------+---------+---------+
| 1 | Japan | 35.6 | 139 | 11 |
| 2 | China | 35 | 105 | 12 |
| 3 | Brazil | -10 | -55 | 56 |
| 4 | Scotland | 55.95 | -3.18 | 12 |
...
+-------+-------------------+--------+---------+---------+
I write a map function to do the operation. But seems map function cannot access external dataframe variable. Any solutions?
val countryDF = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("Country.csv")
var revenueDF = spark.read
.option("inferSchema", "true")
.option("header", "true")
.csv("revenue.csv")
var resultRdd = revenueDF.rdd.map(row => {
val generateRow = (row: Row, latitude: Any, longitude: Any, latidudeIndex: Int, longitudeIndex: Int) => {
val arr = row.toSeq.toArray
arr(latidudeIndex) = latitude
arr(longitudeIndex) = longitude
Row.fromSeq(arr)
}
val countryName = row.getAs[String](1)
// cannot access countryDF, it is corrupted
val countryRow = countryDF.where(col("CountryName") === countryName)
generateRow(row, row.getAs[String](2), row.getAs[String](3),2, 3)
})
revenueDF.sqlContext.createDataFrame(resultRdd, revenueDF.schema).show()
Operation you're looking for is join
salesDF.select("id", "Country").join(
countryDF.select("CountryName", "Latitude", "Longitude")
$"CountryName" === $"Country",
"left"
).drop("Country")
And no, you cannot use DataFrames
, RDD
and other distributed objects in map
, udf
or equivalent.