Search code examples
pysparkgoogle-cloud-dataprocapache-iceberggoogle-cloud-dataproc-metastore

Iceberg - MERGE INTO TABLE is not supported temporarily


I tried to merge data from parquet file into java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily. I use spark 3.3.0 with iceberg 1.1.0 running on a dataproc cluster which already attached to a dataproc metastore service. This is the command I used to create the cluster:

gcloud dataproc clusters create de-dev-dp --image-version=2.1 --region asia-southeast2 \
 --project de-dev --dataproc-metastore projects/de-dev/locations/asia-southeast2/services/de-dev-dpms  \
 --max-idle=30m --num-masters=1 --master-machine-type n2-standard-2  \
 --properties='^#^dataproc:pip.packages=google-cloud-secret-manager==2.15.0,numpy==1.24.1#spark:spark.jars=https://jdbc.postgresql.org/download/postgresql-42.5.1.jar,https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark-runtime-3.3_2.13/1.1.0/iceberg-spark-runtime-3.3_2.13-1.1.0.jar#core:fs.defaultFS=gs://de-dev-dataproc-fs'  \
 --subnet projects/de-dev/regions/asia-southeast2/subnetworks/subnet-de-dev-gke-dp  \
 --service-account=de-dev-ga-dataproc@de-dev.iam.gserviceaccount.com  \
 --master-boot-disk-type pd-ssd --master-boot-disk-size 30GB --num-workers=2 \
 --worker-machine-type n2-standard-2   --worker-boot-disk-type pd-ssd \
 --worker-boot-disk-size 30GB   --bucket de-dev-adhoc --temp-bucket de-dev-adhoc \
 --no-address   --optional-components=JUPYTER --enable-component-gateway

Here is the python code:

from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import LongType, BooleanType, StructField, StructType


conf = (
        SparkConf()
        .setAppName('changelog')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog')
        .set('spark.sql.catalog.spark_catalog.type', 'hive')
        .set('spark.sql.catalog.dev', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.dev.type', 'hive')
    )
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()

df = spark.read.option("mergeSchema", "true").parquet('gs://poc-kafka/mis').sort("timestamp")

# Create iceberg table 
schema = df.select("value.after.*").schema
additional_fields = [StructField('start_at', LongType(), True), 
               StructField('end_at', LongType(), True), 
               StructField('is_current', BooleanType(), True)]
schema = StructType(schema.fields + additional_fields)
empty_df = spark.createDataFrame([], schema)
spark.sql("CREATE DATABASE IF NOT EXISTS dev.dummy")
empty_df.createOrReplaceTempView("empty")
spark.sql("CREATE TABLE IF NOT EXISTS dev.dummy.fruit USING iceberg  "
         "TBLPROPERTIES ('format-version'='2') "
         "AS (SELECT * FROM empty) ")

# Merge table
df.createOrReplaceTempView("changelog")
sql = "MERGE INTO dev.dummy.fruit d " \
      "USING (SELECT value.after.*, value.op op, " \
      "            value.source.ts_ms start_at, True is_current, " \
      "            NULL end_at " \
      "        FROM changelog) s " \
      "ON d.id = s.id " \
      "WHEN MATCHED THEN UPDATE SET d.is_current = False, d.end_at = s.start_at " \
      "WHEN NOT MATCHED THEN INSERT * "
spark.sql(sql)

And this is the traceback:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/tmp/ipykernel_31790/3236470120.py in <cell line: 1>()
----> 1 spark.sql(sql)

/usr/lib/spark/python/pyspark/sql/session.py in sql(self, sqlQuery, **kwargs)
   1032             sqlQuery = formatter.format(sqlQuery, **kwargs)
   1033         try:
-> 1034             return DataFrame(self._jsparkSession.sql(sqlQuery), self)
   1035         finally:
   1036             if len(kwargs) > 0:

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    188     def deco(*a: Any, **kw: Any) -> Any:
    189         try:
--> 190             return f(*a, **kw)
    191         except Py4JJavaError as e:
    192             converted = convert_exception(e.java_exception)

/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o67.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:891)
    at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:821)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
    at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:69)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:459)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:145)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:145)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:158)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:158)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

I am not sure whether it is from misconfiguration I did in spark or how I use the MERGE INTO. Either way it would be great if someone can point out what I did wrong or missed, thanks in advance.


Solution

  • I found out it was caused by the incompatible iceberg jar file. The dataproc image 2.1 using scala 2.12 meanwhile the jar file I used was intended for scala 2.13. I did not realize the jar file were different. Hopefully this helped others facing similar issue.