Search code examples
pysparkapache-spark-sqlgeometryapache-sedona

Task stuck at "GET RESULT" from Join -> groupby in Spark (sedona)


Working in spark 3.4.1, sedona, jupyter lab on a docker image

I'm trying to count the month with the most vessel points from ships from a large dataset with geographic info. I have 2 dataframes, one with the points (Lon, lat, vessel name,date, etc.) and another one with some polygons (multipolygon, polygon name, + additional irrelevant info) from shape files.

The idea is to join both dataframes by the "ST_Contains" function and then group by the polygon name and count the number of points inside each one.

The points are in a csv file, which i read without problem, and same with the polygon file with the areas. The areas file only have 60 entries and the point file that i'm currently using has like 5M entries. I want to test with a small number of entries before i use the complete point dataset (5475M) but when i launch the join in spark with the query below, the task just stay at "GET RESULT" and eventually gives the error TaskResultLost (result lost from block manager)

most_point = sedona.sql(
"""
SELECT NAME, count(*) as cnt
FROM points p
inner join shape s on ST_Contains(s.geometry, ST_Point(p.LON, p.LAT))
group by NAME
  """)
most_point.show()

executor status Error in jupyter lab

I've tried adding more executors and memory, changing the "spark.shuffle.blockTransferService" to "nio" and changing the query in more ways than i remember.

When i change the query so the area file is the "left" table it runs really slow but it gets the job done in about 2 hours by doing it on only 1 executor, but everytime i try to parallelize i get the taskResultLost Error.

I'm guessing that maybe some port is blocked but i don't have experience with spark: this is for a university class and the professor is nowhere to be seen.

Thank you!


Solution

  • In case anyone has this error. Spark assumed that the small file (areas), with 60 entries, was a very large dataset.

    The solution was to add a .cache() to the end of the query which creates the temporary view of this file and then I was able to run it without problems.

    something like this:

    small_file = sedona.sql(
    """select * from table""").cache()