A CSV file read through pyspark contains tens of thousands of GPS information (lat, lon) and a feather file read through geodataframe contains millions of polygon information.
In the previous question (The best algorithm to get index with specific range at pandas dataframe), I succeeded in creating a geodataframe.
However, it was confirmed that a large amount of calculation time was consumed when using geodataframe with data read from pyspark.
First, data was loaded through pyspark in the following.
data = spark.read.option("header", True).csv("path/file.csv")
The data contains second-by-second GPS information about the vehicle as follows:
time | Vehicle.Location.Latitude | Vehicle.Location.Longitude |
---|---|---|
2023-01-01 00:00:00 | 37.123456 | 123.123456 |
2023-01-01 00:00:01 | 37.123457 | 123.123457 |
Second, the previously created geodataframe data was loaded as follows
gdf = gpd.read_feather("/path/file.feather")
The geodataframe contains geometry information as follow:
id | MAX_SPD | geometry |
---|---|---|
0 | 60 | POLYGON ((126.27306 33.19865, 126.27379 33.198... |
1 | 60 | POLYGON ((126.27222 33.19865, 126.27306 33.198... |
Next, I created a user-defined function within pyspark.
The purpose is to find out the MAX_SPD value of the polygon that contains given GPS information. If it is included in multiple polygons, max(MAX_SPD) is retrieved.
def find_intersection(longitude, latitude):
if type(longitude) != float or type(latitude) != float:
return -1
mgdf = gdf['geometry'].contains(Point(longitude, latitude))
max_spd = gdf.loc[mgdf, 'MAX_SPD'].max()
if math.isnan(max_spd):
max_spd = -1
return max_spd
find_intersection_udf = udf(find_intersection, IntegerType())
data = data.withColumn("max_spd", find_intersection_udf(col("`Vehicle.Location.Longitude`"), col("`Vehicle.Location.Latitude`")))
data.select("`max_spd`").show()
However, Using user-defined function seems to be time consuming. Are there any other good ideas to redue time consumption?
You can use Apache Sedona for your geospatial analysis.
https://sedona.apache.org/latest-snapshot/tutorial/sql/
I adapted this notebook to give you an example of how to do this.
All you have to do is readin your Points CSV into point_rdd and polygon data in feather file format into Geopandas dataframe and then convert into polygon_rdd. Do a join query and get results. The results will have your attributes like MAX_SPD
and then you do a aggregate query to retrieve the max value i.e. max(MAX_SPD)
.
Notebook :
https://github.com/apache/sedona/blob/master/binder/ApacheSedonaCore.ipynb
Data used in the following script can be found at this location :
https://github.com/apache/sedona/tree/master/binder/data
Requirements :
pip install apache-sedona
pip install geopandas
pip install folium
Python script :
import sys
import folium
from pyspark import StorageLevel
import geopandas as gpd
import pandas as pd
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from sedona.spark import *
config = SedonaContext.builder() .\
config('spark.jars.packages',
'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.1,'
'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
config("spark.serializer", KryoSerializer.getName). \
config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
getOrCreate()
sedona = SedonaContext.create(config)
sc = sedona.sparkContext
point_rdd = PointRDD(sc, "../sedona_data/arealm-small.csv", 1, FileDataSplitter.CSV, True, 10, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
## Getting approximate total count
count_approx = point_rdd.approximateTotalCount
print("approximate count", count_approx)
# To run analyze please use function analyze
print("analyse is here", point_rdd.analyze())
# collect to Python list
points_List = point_rdd.rawSpatialRDD.collect()[:5]
print("Printng point list")
print(points_List)
point_rdd_to_geo = point_rdd.rawSpatialRDD.map(lambda x: [x.geom, *x.getUserData().split("\t")])
point_gdf = gpd.GeoDataFrame(
point_rdd_to_geo.collect(), columns=["geom", "attr1", "attr2", "attr3"], geometry="geom"
)
print("GeoPandas Dataframe")
print(point_gdf[:5])
spatial_df = Adapter.\
toDf(point_rdd, ["attr1", "attr2", "attr3"], sedona).\
createOrReplaceTempView("spatial_df")
spatial_gdf = sedona.sql("Select attr1, attr2, attr3, geometry as geom from spatial_df")
spatial_gdf.show(5, False)
## Apache Sedona spatial partitioning method can
# significantly speed up the join query.
# Three spatial partitioning methods are available:
# KDB-Tree, Quad-Tree and R-Tree.
# Two SpatialRDD must be partitioned by the same way.
## Very Important : Choose the Same Spatial Paritioning Everywhere. in this case GridType.KDBTREE
done_value = point_rdd.spatialPartitioning(GridType.KDBTREE)
print("spatial partitioniing done value = ", done_value)
polygon_rdd = PolygonRDD(sc, "../sedona_data/primaryroads-polygon.csv", FileDataSplitter.CSV, True, 11, StorageLevel.MEMORY_ONLY, "epsg:4326", "epsg:4326")
print(polygon_rdd.analyze())
polygon_rdd.spatialPartitioning(point_rdd.getPartitioner()) ### This retrieves the same paritioner i.e. GridType.KDBTREE
# building an index
point_rdd.buildIndex(IndexType.RTREE, True)
polygon_rdd.buildIndex(IndexType.RTREE, True)
# Perform Spatial Join Query
result = JoinQuery.SpatialJoinQueryFlat(point_rdd, polygon_rdd, True, False)
print("Result of the Join Query")
print(result.take(2))
schema = StructType(
[
StructField("geom_left", GeometryType(), False),
StructField("geom_right", GeometryType(), False)
]
)
# Set verifySchema to False
spatial_join_result = result.map(lambda x: [x[0].geom, x[1].geom])
sedona_df = sedona.createDataFrame(spatial_join_result, schema, verifySchema=False)
print("Schema of Sedona Dataframe")
print(sedona_df.printSchema())
print("Values here")
sedona_df.show(n=10, truncate=False)
gdf = gpd.GeoDataFrame(sedona_df.rdd.collect(), columns=["geom_left", "geom_right"], geometry="geom_left")
gdf_points = gpd.GeoDataFrame(sedona_df.rdd.collect(), columns=["geom_left", "geom_right"], geometry="geom_right")
folium_map = folium.Map(zoom_start=0.0001, tiles="CartoDB positron")
for _, r in gdf_points.iterrows():
# Without simplifying the representation of each borough,
# the map might not be displayed
sim_geo = gpd.GeoSeries(r["geom_right"]).simplify(tolerance=0.000001)
geo_j = sim_geo.to_json()
geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "red"})
geo_j.add_to(folium_map)
for _, r in gdf.iterrows():
# Without simplifying the representation of each borough,
# the map might not be displayed
sim_geo = gpd.GeoSeries(r["geom_left"]).simplify(tolerance=0.000001)
geo_j = sim_geo.to_json()
geo_j = folium.GeoJson(data=geo_j, style_function=lambda x: {"fillColor": "yellow"})
geo_j.add_to(folium_map)
folium_map.show_in_browser()
Output :
approximate count 3000
analyse is here True
Printng point list
[Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2, Geometry: Point userData: testattribute0 testattribute1 testattribute2]
GeoPandas Dataframe
geom attr1 attr2 attr3
0 POINT (-88.33149 32.32414) testattribute0 testattribute1 testattribute2
1 POINT (-88.17593 32.36076) testattribute0 testattribute1 testattribute2
2 POINT (-88.38895 32.35707) testattribute0 testattribute1 testattribute2
3 POINT (-88.22110 32.35078) testattribute0 testattribute1 testattribute2
4 POINT (-88.32399 32.95067) testattribute0 testattribute1 testattribute2
+--------------+--------------+--------------+----------------------------+
|attr1 |attr2 |attr3 |geom |
+--------------+--------------+--------------+----------------------------+
|testattribute0|testattribute1|testattribute2|POINT (-88.331492 32.324142)|
|testattribute0|testattribute1|testattribute2|POINT (-88.175933 32.360763)|
|testattribute0|testattribute1|testattribute2|POINT (-88.388954 32.357073)|
|testattribute0|testattribute1|testattribute2|POINT (-88.221102 32.35078) |
|testattribute0|testattribute1|testattribute2|POINT (-88.323995 32.950671)|
+--------------+--------------+--------------+----------------------------+
only showing top 5 rows
spatial partitioniing done value = True
True
Result of the Join Query
[[Geometry: Polygon userData: , Geometry: Point userData: testattribute0 testattribute1 testattribute2], [Geometry: Polygon userData: , Geometry: Point userData: testattribute0 testattribute1 testattribute2]]
Schema of Sedona Dataframe
root
|-- geom_left: geometry (nullable = false)
|-- geom_right: geometry (nullable = false)
None
Values here
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|geom_left |geom_right |
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
|POLYGON ((-88.40049 30.474455, -88.40049 30.692167, -88.006968 30.692167, -88.006968 30.474455, -88.40049 30.474455)) |POINT (-88.35457 30.634836) |
|POLYGON ((-88.40047 30.474213, -88.40047 30.691941, -88.007269 30.691941, -88.007269 30.474213, -88.40047 30.474213)) |POINT (-88.35457 30.634836) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.347036 32.454904)|
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.39203 32.507796) |
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.349276 32.548266)|
|POLYGON ((-88.403842 32.448773, -88.403842 32.737139, -88.099114 32.737139, -88.099114 32.448773, -88.403842 32.448773))|POINT (-88.329313 32.618924)|
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032)) |POINT (-88.347036 32.454904)|
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032)) |POINT (-88.39203 32.507796) |
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032)) |POINT (-88.349276 32.548266)|
|POLYGON ((-88.403823 32.449032, -88.403823 32.737291, -88.09933 32.737291, -88.09933 32.449032, -88.403823 32.449032)) |POINT (-88.329313 32.618924)|
+------------------------------------------------------------------------------------------------------------------------+----------------------------+
only showing top 10 rows
Your map should have been opened in your browser automatically.
Following is the map opened in browser.