Search code examples
postgresqljdbcapache-sparkapache-spark-sqlhstore

How to write into PostgreSQL hstore using Spark Dataset


I'm trying to write a Spark Dataset into an existent postgresql table (can't change the table metadata like column types). One of the columns of this table is of type HStore and it's causing trouble.

I see the following exception when I launch the write (here the original map is empty which when escaped gives an empty string):

Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted.  Call getNextException to see the cause.
    at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
    at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419)
    at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
    at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
    at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
    at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying

This is how I'm doing it:

def escapePgHstore[A, B](hmap: Map[A, B]) = {
  hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")

ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
  .drop("original_column")
  .coalesce(1).write
  .mode(org.apache.spark.sql.SaveMode.Append)
  .option("driver", "org.postgresql.Driver")
  .jdbc(jdbcUrl, hashedTablePartName, props)

If I don't escape the original_column from Map[String, Long] to String using escapePgHstore I see the following errors:

java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What's the right way to make spark write a valid hstore data type??


Solution

  • It turns out I have just to let postgres try to guess the appropriate type of my column. By setting stringtype to unspecified in the connection string as described in the official documentation.

    props.put("stringtype", "unspecified")
    

    Now it works perfectly !!