Search code examples
scalaapache-sparkh3geospark

How to create a PolygonRDD from H3 boundary?


I'm using Apache Spark with Apache Sedona (previously called GeoSpark), and I'm trying to do the following:

  1. Take a DataFrame containing latitude and longitude in each row (it comes from an arbitrary source, it neither is a PointRDD nor comes from a specific file format) and transform it into a DataFrame with the H3 index of each point.
  2. Take that DataFrame and create a PolygonRDD containing the H3 cell boundaries of each distinct H3 index.

This is what I have so far:

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.sedona.core.spatialRDD.PolygonRDD
import org.apache.sedona.sql.utils.SedonaSQLRegistrator
import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
import org.locationtech.jts.geom.{Polygon, GeometryFactory, Coordinate}
import com.uber.h3core.H3Core
import com.uber.h3core.util.GeoCoord

object Main {
  def main(args: Array[String]) {
    val sparkSession: SparkSession = SparkSession
      .builder()
      .config("spark.serializer", classOf[KryoSerializer].getName)
      .config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
      .master("local[*]")
      .appName("Sedona-Analysis")
      .getOrCreate()
    import sparkSession.implicits._

    SedonaSQLRegistrator.registerAll(sparkSession)
    SedonaVizRegistrator.registerAll(sparkSession)

    val df = Seq(
      (-8.01681, -34.92618),
      (-25.59306, -49.39895),
      (-7.17897, -34.86518),
      (-20.24521, -42.14273),
      (-20.24628, -42.14785),
      (-27.01641, -50.94109),
      (-19.72987, -47.94319)
    ).toDF("latitude", "longitude")

    val core: H3Core = H3Core.newInstance()
    val geoFactory = new GeometryFactory()
    val geoToH3 = udf((lat: Double, lng: Double, res: Int) => core.geoToH3(lat, lng, res))

    val trdd = df
      .select(geoToH3($"latitude", $"longitude", lit(7)).as("h3index"))
      .distinct()
      .rdd
      .map(row => {
        val h3 = row.getAs[Long](0)
        val lboundary = core.h3ToGeoBoundary(h3)
        val aboundary = lboundary.toArray(Array.ofDim[GeoCoord](lboundary.size))
        val poly = geoFactory.createPolygon(
          aboundary.map((c: GeoCoord) => new Coordinate(c.lat, c.lng))
        )
        poly.setUserData(h3)
        poly
      })
    val polyRDD = new PolygonRDD(trdd)
    polyRDD.rawSpatialRDD.foreach(println)

    sparkSession.stop()
  }
}

However, after running sbt assembly and submitting the output jar to spark-submit, I get this error:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2362)
    at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.map(RDD.scala:395)
    at Main$.main(Main.scala:44)
    at Main.main(Main.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.uber.h3core.H3Core
Serialization stack:
    - object not serializable (class: com.uber.h3core.H3Core, value: com.uber.h3core.H3Core@3407ded1)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class Main$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic Main$.$anonfun$main$2:(Lcom/uber/h3core/H3Core;Lorg/locationtech/jts/geom/GeometryFactory;Lorg/apache/spark/sql/Row;)Lorg/locationtech/jts/geom/Polygon;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Lorg/locationtech/jts/geom/Polygon;, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class Main$$$Lambda$1710/0x0000000840d7f040, Main$$$Lambda$1710/0x0000000840d7f040@4853f592)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
    ... 22 more

What is the proper way to achieve what I'm trying to do?


Solution

  • So, basically just adding the Serializable trait to an object containing the H3Core was enough. Also, I had to adjust the Coordinate array to begin and end with the same point.

    import org.apache.spark.serializer.KryoSerializer
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.sedona.core.spatialRDD.PolygonRDD
    import org.apache.sedona.sql.utils.SedonaSQLRegistrator
    import org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
    import org.apache.sedona.viz.sql.utils.SedonaVizRegistrator
    import org.locationtech.jts.geom.{Polygon, GeometryFactory, Coordinate}
    import com.uber.h3core.H3Core
    import com.uber.h3core.util.GeoCoord
    
    object H3 extends Serializable {
      val core = H3Core.newInstance()
      val geoFactory = new GeometryFactory()
    }
    
    object Main {
      def main(args: Array[String]) {
        val sparkSession: SparkSession = SparkSession
          .builder()
          .config("spark.serializer", classOf[KryoSerializer].getName)
          .config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName)
          .master("local[*]")
          .appName("Sedona-Analysis")
          .getOrCreate()
        import sparkSession.implicits._
    
        SedonaSQLRegistrator.registerAll(sparkSession)
        SedonaVizRegistrator.registerAll(sparkSession)
    
        val df = Seq(
          (-8.01681, -34.92618),
          (-25.59306, -49.39895),
          (-7.17897, -34.86518),
          (-20.24521, -42.14273),
          (-20.24628, -42.14785),
          (-27.01641, -50.94109),
          (-19.72987, -47.94319)
        ).toDF("latitude", "longitude")
    
        val geoToH3 = udf((lat: Double, lng: Double, res: Int) => H3.core.geoToH3(lat, lng, res))
    
        val trdd = df
          .select(geoToH3($"latitude", $"longitude", lit(7)).as("h3index"))
          .distinct()
          .rdd
          .map(row => {
            val h3 = row.getAs[Long](0)
            val lboundary = H3.core.h3ToGeoBoundary(h3)
            val aboundary = lboundary.toArray(Array.ofDim[GeoCoord](lboundary.size))
            val poly = H3.geoFactory.createPolygon({
              val ps = aboundary.map((c: GeoCoord) => new Coordinate(c.lat, c.lng))
              ps :+ ps(0)
            })
            poly.setUserData(h3)
            poly
          })
        
        val polyRDD = new PolygonRDD(trdd)
        polyRDD.rawSpatialRDD.foreach(println)
    
        sparkSession.stop()
      }
    }