Search code examples
scalaapache-sparkdataframerdd

How to access external dataframe in rdd map function?


I have two dataframes.

countryDF

+-------+-------------------+--------+---------+
|   id  |    CountryName    |Latitude|Longitude|
+-------+-------------------+--------+---------+
|  1    | United States     |  39.76 |   -98.5 |
|  2    | China             |  35    |   105   |
|  3    | India             |  20    |   77    |
|  4    | Brazil            |  -10   |   -55   |
...
+-------+-------------------+--------+---------+

salesDF

+-------+-------------------+--------+---------+--------+
|   id  |    Country        |Latitude|Longitude|revenue |
+-------+-------------------+--------+---------+--------+
|  1    | Japan             |        |         |   11   |
|  2    | China             |        |         |   12   |
|  3    | Brazil            |        |         |   56   |
|  4    | Scotland          |        |         |   12   |
...
+-------+-------------------+--------+---------+--------+

The task is to generate latitude and longitude for salesDF. That will search each cell of salesDF column "Country" from countryDF column "CountryName". If found a row, append corresponding "Latitude" and "Longitude" to it.

The output dataframe is:

+-------+-------------------+--------+---------+---------+
|   id  |    CountryName    |Latitude|Longitude|revenue  |
+-------+-------------------+--------+---------+---------+
|  1    | Japan             |  35.6  |   139   | 11      |
|  2    | China             |  35    |   105   | 12      |
|  3    | Brazil            |  -10   |   -55   | 56      |
|  4    | Scotland          |  55.95 |  -3.18  | 12      |
...
+-------+-------------------+--------+---------+---------+

I write a map function to do the operation. But seems map function cannot access external dataframe variable. Any solutions?

val countryDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("Country.csv")

var revenueDF = spark.read
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("revenue.csv")

var resultRdd = revenueDF.rdd.map(row => {
  val generateRow = (row: Row, latitude: Any, longitude: Any, latidudeIndex: Int, longitudeIndex: Int) => {
    val arr = row.toSeq.toArray
    arr(latidudeIndex) = latitude
    arr(longitudeIndex) = longitude
    Row.fromSeq(arr)
  }
  val countryName = row.getAs[String](1)
  // cannot access countryDF, it is corrupted
  val countryRow = countryDF.where(col("CountryName") === countryName)
  generateRow(row, row.getAs[String](2), row.getAs[String](3),2, 3)

})
revenueDF.sqlContext.createDataFrame(resultRdd, revenueDF.schema).show()

Solution

  • Operation you're looking for is join

    salesDF.select("id", "Country").join(
      countryDF.select("CountryName", "Latitude", "Longitude")
      $"CountryName" === $"Country",
      "left"
    ).drop("Country")
    

    And no, you cannot use DataFrames, RDD and other distributed objects in map, udf or equivalent.