Search code examples
scalaapache-sparkrdd

How to flatten grouped Spark RDD contents as individual lines then save to file


I have a RDD[(Int, Iterable[Coordinates])] that has been grouped by key (index: Int). Coordinates is a class with the members:

latitude: Double, longitude: Double

I would like to create print or create a csv file, that would be in the following form (a row for each datapoint):

index,latitude,longitude

With the non-grouped RDD[(Int, Coordinates)] it worked like this:

val textOutputRDD = initialRDD.map(
  f => f._1.toString() + "," + f._2.latitude.toString() + "," + f._2.longitude.toString())
textOutputRDD.saveAsTextFile("TextOutput")

How do I manage to do that in this case?


Solution

  • A simple nested loop will do. Here I approximate coordinates with a simple pair of doubles:

    val rdd =
      sc.parallelize(
        Seq(
          1 -> Seq((4.1, 3.4), (5.6, 6.7), (3.4, 9.0)),
          2 -> Seq((0.4, -4.1), (-3.4, 6.7), (7.0, 8.9))
        )
      )
    
    val csvLike =
      for ((key, coords) <- rdd; (lat, lon) <- coords) yield s"$key,$lat,$lon"
    
    for (row <- csvLike) println(row)
    

    This code will result in the following output:

    2,0.4,-4.1
    2,-3.4,6.7
    2,7.0,8.9
    1,4.1,3.4
    1,5.6,6.7
    1,3.4,9.0
    

    Edit

    Another possible approach is to swap in the actual flatMap/map sequence the compiler would turn the for comprehension into:

    rdd.flatMap {
      case (key, coords) =>
        coords.map {
          case (lat, lon) => s"$key,$lat,$lon"
        }
    }