Search code examples
pythonapache-sparkdelta-live-tables

PySpark and delta tables merge is not supported temporarily


I get this error in PySpark when I try to sql merge a delta table:

py4j.protocol.Py4JJavaError: An error occurred while calling o64.sql.
: java.lang.UnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:744)
        at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:875)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
        at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
        at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:71)
        at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:504)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$2(QueryExecution.scala:165)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:165)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:158)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$2(QueryExecution.scala:178)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:224)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:224)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:175)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:75)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:308)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:606)
        at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:308)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:323)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:277)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:256)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:102)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:112)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:83)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:519)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:495)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:108)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:95)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:93)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)

Here is the sql merge query:


merge into re_obu_logistic_status_cockpit_report
        using (
                select *
                from   re_data_to_merge
              ) info_to_merge
        on    (
                re_obu_logistic_status_cockpit_report.date = info_to_merge.date
                and re_obu_logistic_status_cockpit_report.status_id = info_to_merge.status_id
                and re_obu_logistic_status_cockpit_report.materialnumber = info_to_merge.materialnumber
                and re_obu_logistic_status_cockpit_report.sales_partner_id = info_to_merge.sales_partner_id

              )
        when  matched
          then update set re_obu_logistic_status_cockpit_report.status = info_to_merge.status
                         ,re_obu_logistic_status_cockpit_report.sales_partner_name = info_to_merge.sales_partner_name
                         ,re_obu_logistic_status_cockpit_report.obu_count = info_to_merge.obu_count
                         ,re_obu_logistic_status_cockpit_report.obu_count_pct = info_to_merge.obu_count_pct
                         ,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz = info_to_merge.dan_last_changed_tsz
                         ,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch = info_to_merge.dan_last_changed_epoch

        when  not matched
          then insert (
                        re_obu_logistic_status_cockpit_report.date
                       ,re_obu_logistic_status_cockpit_report.status_id
                       ,re_obu_logistic_status_cockpit_report.status
                       ,re_obu_logistic_status_cockpit_report.materialnumber
                       ,re_obu_logistic_status_cockpit_report.sales_partner_id
                       ,re_obu_logistic_status_cockpit_report.sales_partner_name
                       ,re_obu_logistic_status_cockpit_report.obu_count
                       ,re_obu_logistic_status_cockpit_report.obu_count_pct
                       ,re_obu_logistic_status_cockpit_report.dan_last_changed_tsz
                       ,re_obu_logistic_status_cockpit_report.dan_last_changed_epoch

                      )
          values      (
                        info_to_merge.date
                       ,info_to_merge.status_id
                       ,info_to_merge.status
                       ,info_to_merge.materialnumber
                       ,info_to_merge.sales_partner_id
                       ,info_to_merge.sales_partner_name
                       ,info_to_merge.obu_count
                       ,info_to_merge.obu_count_pct
                       ,info_to_merge.dan_last_changed_tsz
                       ,info_to_merge.dan_last_changed_epoch

                      )

I restarted a new EMR cluster, removed the delta tables from S3, then I relaunched the python application. I also restarted the EC2 instance asking the EMR cluster to run the application.


Solution

  • This was due to wrong " character in the spark-submit command: The wrong spark-submit was:

    spark-submit \
      --master yarn \
      --driver-memory 8g \
      --executor-memory 8g \
      --executor-cores 8 \
      --packages io.delta:delta-core_2.12:2.0.0 \
      --conf “spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension” \
      --conf “spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog” \
      --conf “spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED” \
      --conf “spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED” \
      --conf “spark.sql.legacy.timeParserPolicy=LEGACY” \
      --conf spark.rpc.askTimeout=600s \
      main.py
    

    The correct spark-submit is:

    spark-submit \
      --master yarn \
      --driver-memory 8g \
      --executor-memory 8g \
      --executor-cores 8 \
      --packages io.delta:delta-core_2.12:2.0.0 \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
      --conf "spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED" \
      --conf "spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED" \
      --conf "spark.sql.legacy.timeParserPolicy=LEGACY" \
      --conf spark.rpc.askTimeout=600s \
      main.py