Search code examples
pythonpysparkgeopandas

The fastest way of pyspark and geodataframe to check if a point is contained in a polygon


A CSV file read through pyspark contains tens of thousands of GPS information (lat, lon) and a feather file read through geodataframe contains millions of polygon information.

In the previous question (The best algorithm to get index with specific range at pandas dataframe), I succeeded in creating a geodataframe.

However, it was confirmed that a large amount of calculation time was consumed when using geodataframe with data read from pyspark.

First, data was loaded through pyspark in the following.

data = spark.read.option("header", True).csv("path/file.csv")

The data contains second-by-second GPS information about the vehicle as follows:

time Vehicle.Location.Latitude Vehicle.Location.Longitude
2023-01-01 00:00:00 37.123456 123.123456
2023-01-01 00:00:01 37.123457 123.123457

Second, the previously created geodataframe data was loaded as follows

gdf = gpd.read_feather("/path/file.feather")

The geodataframe contains geometry information as follow:

id MAX_SPD geometry
0 60 POLYGON ((126.27306 33.19865, 126.27379 33.198...
1 60 POLYGON ((126.27222 33.19865, 126.27306 33.198...

Next, I created a user-defined function within pyspark.

The purpose is to find out the MAX_SPD value of the polygon that contains given GPS information. If it is included in multiple polygons, max(MAX_SPD) is retrieved.

def find_intersection(longitude, latitude):
    if type(longitude) != float or type(latitude) != float:
        return -1
    mgdf = gdf['geometry'].contains(Point(longitude, latitude))
    max_spd = gdf.loc[mgdf, 'MAX_SPD'].max()
    if math.isnan(max_spd):
        max_spd = -1
    return max_spd

find_intersection_udf = udf(find_intersection, IntegerType())
data = data.withColumn("max_spd", find_intersection_udf(col("`Vehicle.Location.Longitude`"), col("`Vehicle.Location.Latitude`")))
data.select("`max_spd`").show()

However, Using user-defined function seems to be time consuming. Are there any other good ideas to redue time consumption?


Solution

  • You can use Apache Sedona for your geospatial analysis.

    https://sedona.apache.org/latest-snapshot/tutorial/sql/

    I adapted this notebook to give you an example of how to do this. All you have to do is readin your Points CSV into point_rdd and polygon data in feather file format into Geopandas dataframe and then convert into polygon_rdd. Do a join query and get results. The results will have your attributes like MAX_SPD and then you do a aggregate query to retrieve the max value i.e. max(MAX_SPD) .

    Notebook :

    https://github.com/apache/sedona/blob/master/binder/ApacheSedonaCore.ipynb

    Data used in the following script can be found at this location :

    https://github.com/apache/sedona/tree/master/binder/data

    Requirements :

    pip install apache-sedona

    pip install geopandas

    pip install folium

    Python script :

    import sys
    
    import folium
    from pyspark import StorageLevel
    import geopandas as gpd
    import pandas as pd
    from pyspark.sql.types import StructType
    from pyspark.sql.types import StructField
    from sedona.spark import *
    
    
    config = SedonaContext.builder() .\
        config('spark.jars.packages',
               'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,'
               'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
        config("spark.serializer", KryoSerializer.getName). \
        config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
        getOrCreate()
    
    sedona = SedonaContext.create(config)
    
    sc = sedona.sparkContext
    
    point_rdd = PointRDD(sc, "../sedona_data/arealm-small.csv", 1, FileDataSplitter.CSV, True, 10, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
    
    ## Getting approximate total count
    count_approx = point_rdd.approximateTotalCount
    print("approximate count", count_approx)
    
    # To run analyze please use function analyze
    print("analyse is here", point_rdd.analyze())
    
    # collect to Python list
    points_List = point_rdd.rawSpatialRDD.collect()[:5]
    
    print("Printng point list")
    print(points_List)
    
    point_rdd_to_geo = point_rdd.rawSpatialRDD.map(lambda x: [x.geom, *x.getUserData().split("\t")])
    
    point_gdf = gpd.GeoDataFrame(
        point_rdd_to_geo.collect(), columns=["geom", "attr1", "attr2", "attr3"], geometry="geom"
    )
    
    print("GeoPandas Dataframe")
    print(point_gdf[:5])
    
    
    spatial_df = Adapter.\
        toDf(point_rdd, ["attr1", "attr2", "attr3"], sedona).\
        createOrReplaceTempView("spatial_df")
    
    spatial_gdf = sedona.sql("Select attr1, attr2, attr3, geometry as geom from spatial_df")
    
    spatial_gdf.show(5, False)
    
    
    ## Apache Sedona spatial partitioning method can
    # significantly speed up the join query.
    # Three spatial partitioning methods are available:
    # KDB-Tree, Quad-Tree and R-Tree.
    # Two SpatialRDD must be partitioned by the same way.
    
    
    ## Very Important : Choose the Same Spatial Paritioning Everywhere. in this case GridType.KDBTREE
    done_value = point_rdd.spatialPartitioning(GridType.KDBTREE)
    print("spatial partitioniing done value = ", done_value)
    
    polygon_rdd = PolygonRDD(sc, "../sedona_data/primaryroads-polygon.csv", FileDataSplitter.CSV, True, 11, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
    print(polygon_rdd.analyze())
    polygon_rdd.spatialPartitioning(point_rdd.getPartitioner())  ### This retrieves the same paritioner i.e. GridType.KDBTREE
    
    # building an index
    point_rdd.buildIndex(IndexType.RTREE, True)
    polygon_rdd.buildIndex(IndexType.RTREE, True)
    
    # Perform Spatial Join Query
    result = JoinQuery.SpatialJoinQueryFlat(point_rdd, polygon_rdd, True, False)
    
    print("Result of the Join Query")
    print(result.take(2))
    
    
    schema = StructType(
        [
            StructField("geom_left", GeometryType(), False),
            StructField("geom_right", GeometryType(), False)
        ]
    )
    
    # Set verifySchema to False
    spatial_join_result = result.map(lambda x: [x[0].geom, x[1].geom])
    sedona_df = sedona.createDataFrame(spatial_join_result, schema, verifySchema=False)
    
    print("Schema of Sedona Dataframe")
    print(sedona_df.printSchema())
    
    print("Values here")
    sedona_df.show(n=10, truncate=False)
    
    
    gdf = gpd.GeoDataFrame(sedona_df.rdd.collect(), columns=["geom_left", "geom_right"], geometry="geom_left")
    
    gdf_points = gpd.GeoDataFrame(sedona_df.rdd.collect(), columns=["geom_left", "geom_right"], geometry="geom_right")
    
    
    
    folium_map = folium.Map(zoom_start=0.0001, tiles="CartoDB positron")
    
    for _, r in gdf_points.iterrows():
        # Without simplifying the representation of each borough,
        # the map might not be displayed
        sim_geo = gpd.GeoSeries(r["geom_right"]).simplify(tolerance=0.000001)
    
        geo_j = sim_geo.to_json()
        geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "red"})
        geo_j.add_to(folium_map)
    
    
    for _, r in gdf.iterrows():
        # Without simplifying the representation of each borough,
        # the map might not be displayed
        sim_geo = gpd.GeoSeries(r["geom_left"]).simplify(tolerance=0.000001)
    
    
        geo_j = sim_geo.to_json()
        geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "yellow"})
        geo_j.add_to(folium_map)
    
    folium_map.show_in_browser()
    

    Output :

    approximate count 3000
    analyse is here True
    Printng point list
    [Geometry: Point userData: testattribute0   testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2, Geometry: Point userData: testattribute0    testattribute1  testattribute2]
    GeoPandas Dataframe
                             geom           attr1           attr2           attr3
    0  POINT (-88.33149 32.32414)  testattribute0  testattribute1  testattribute2
    1  POINT (-88.17593 32.36076)  testattribute0  testattribute1  testattribute2
    2  POINT (-88.38895 32.35707)  testattribute0  testattribute1  testattribute2
    3  POINT (-88.22110 32.35078)  testattribute0  testattribute1  testattribute2
    4  POINT (-88.32399 32.95067)  testattribute0  testattribute1  testattribute2
    +--------------+--------------+--------------+----------------------------+
    |attr1         |attr2         |attr3         |geom                        |
    +--------------+--------------+--------------+----------------------------+
    |testattribute0|testattribute1|testattribute2|POINT (-88.331492 32.324142)|
    |testattribute0|testattribute1|testattribute2|POINT (-88.175933 32.360763)|
    |testattribute0|testattribute1|testattribute2|POINT (-88.388954 32.357073)|
    |testattribute0|testattribute1|testattribute2|POINT (-88.221102 32.35078) |
    |testattribute0|testattribute1|testattribute2|POINT (-88.323995 32.950671)|
    +--------------+--------------+--------------+----------------------------+
    only showing top 5 rows
    
    spatial partitioniing done value =  True
    True
    Result of the Join Query
    [[Geometry: Polygon userData: , Geometry: Point userData: testattribute0    testattribute1  testattribute2], [Geometry: Polygon userData: , Geometry: Point userData: testattribute0    testattribute1  testattribute2]]
    Schema of Sedona Dataframe
    root
     |-- geom_left: geometry (nullable = false)
     |-- geom_right: geometry (nullable = false)
    
    None
    Values here
    +------------------------------------------------------------------------------------------------------------------------+----------------------------+
    |geom_left                                                                                                               |geom_right                  |
    +------------------------------------------------------------------------------------------------------------------------+----------------------------+
    |POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455))   |POINT (-88.35457 30.634836) |
    |POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213))   |POINT (-88.35457 30.634836) |
    |POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.347036 32.454904)|
    |POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
    |POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.349276 32.548266)|
    |POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.329313 32.618924)|
    |POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.347036 32.454904)|
    |POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.39203 32.507796) |
    |POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.349276 32.548266)|
    |POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032))  |POINT (-88.329313 32.618924)|
    +------------------------------------------------------------------------------------------------------------------------+----------------------------+
    only showing top 10 rows
    
    Your map should have been opened in your browser automatically.
    

    Following is the map opened in browser.

    enter image description here