Search code examples
postgresqlapache-sparkpostgis

How to enable Postgis Query in Spark SQL


I have a PostgreSQL database with Postgis extension, so I can do queries like:

SELECT *
FROM poi_table
WHERE (ST_DistanceSphere(the_geom, ST_GeomFromText('POINT(121.37796 31.208297)', 4326)) < 6000)

And with Spark SQL, I can query the table in my Spark Application (in Scala) like:

spark.sql("select the_geom from poi_table where the_geom is not null").show

The problem is, Spark SQL doesn't support Postgis extension. For example, when I query the table using Postgis function ST_DistanceSphere, I got such an error:

scala> spark.sql("select * FROM poi_table WHERE (ST_DistanceSphere(the_geom, ST_GeomFromText('POINT(121.37796 31.208297)', 4326)) < 60)")
org.apache.spark.sql.AnalysisException: Undefined function: 'ST_DistanceSphere'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 65
  at
...

With Python, I can create a Postgresql connection and send this query to Postgresql server to execute it.

So, is there any similar workaround in Spark/Scala?
Or even better, any jar I can use to enable Spark SQL supporting Postgis extension?


Solution

  • With Python, I can create a Postgresql connection and send this query to Postgresql server to execute it.

    You can do the same with Scala. Use JDBC (java.sql.{Connection,DriverManager}) and get result set.

    Or even better, any jar I can use to enable Spark SQL supporting Postgis extension

    You cannot, because this is not a Postgres query. What you execute in spark.sql is a Spark query. What you can do is to use subquery:

    Maybe it will fit your requirements (if query doesn't have to be dynamic). Unfortunately Spark SQL doesn't support geometric types either, so may have to cast it to something consumable by Spark or define your own dialect.