Search code examples
apache-sparkamazon-emrthriftdbtnessie

Syntax error when using Nessie commands with DBT but not using Spark


We are trying to setup an environment using AWS EMR (on EC2), DBT, Spark and Nessie.

Even though all the extentions are installed correctly, and Nessie commands like 'CREATE BRANCH' work on the cluster from Jupyter, and directly from Spark, they do not work as part of DBT.

Regular SQL commands work, and return responses as intended but when trying to create a branch or use one I get a parsing error.

I am using the latest versions possible

here is the stack trace:

ERROR SparkExecuteStatementOperation: Error executing query with a1aab108-eaaa-4c48-9951-5959ca24a038, currentState RUNNING,
org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error at or near 'demo_branch2'(line 3, pos 22)

== SQL ==
/* {"app": "dbt", "dbt_version": "1.5.2", "profile_name": "thrift_tests", "target_name": "dev", "node_id": "model.thrift_tests.my_first_dbt_model"} */

        use reference demo_branch2 in dev_catalog
----------------------^^^


        at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser.parsePlan(IcebergSparkSqlExtensionsParser.scala:133) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-0.jar:?]
        at org.apache.spark.sql.catalyst.parser.extensions.NessieSparkSqlExtensionsParser.parsePlan(NessieSparkSqlExtensionsParser.scala:114) ~[org.projectnessie.nessie-integrations_nessie-spark-extensions-3.3_2.12-0.51.1.jar:?]
        at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:291) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372]
        at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_372]
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) ~[hadoop-client-api-3.3.3-amzn-2.jar:?]
        at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]

EMR (6.10) configuration:

[
  {
    "Classification": "iceberg-defaults",
    "Properties": {
      "iceberg.enabled": "true"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.63.0",
      "spark.sql.catalog.dev_catalog": "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.dev_catalog.auth_type": "NONE",
      "spark.sql.catalog.dev_catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
      "spark.sql.catalog.dev_catalog.ref": "main",
      "spark.sql.catalog.dev_catalog.uri": "https://nessie-dev.dev.XYZ.cloud/api/v1",
      "spark.sql.catalog.dev_catalog.warehouse": "s3://.../nessie_catalog/",
      "spark.sql.defaultCatalog": "dev_catalog",
      "spark.sql.execution.pyarrow.enabled": "true",
      "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
    }
  }
]

I have tried running commands that are not related to Nessie and they were successful

I tried running spark commands for Nessie and they worked.

Everything works as intended from Jupyter

I tried changing the configuration of the thrift server to not validate (using GPT, I was not able to find that myself it might be GPT just making shit up)

I tried running Iceberg commands using DBT and it works.


Solution

  • Note that the SQL that is sent to spark is sent with a prefix of the following comment:

    /* {"app": "dbt", "dbt_version": "1.5.2", "profile_name": "thrift_tests", "target_name": "dev", "node_id": "model.thrift_tests.my_first_dbt_model"} */
    

    Although this comment doesn't cause problems when the SQL in the end is standard, it does cause problem when using it with nessie commands such as USE REFERENCE, LIST REFERENCES and etc.

    To overcome this issue, We had to override the function execute in dbt.adapters.spark.session and remove the comment.

    We did it as follows:

    original_execute = Cursor.execute
    
    def execute(self, sql: str, *parameters: Any) -> None:
        try:
            sql = sql[sql.find("*/") + 3:]
            original_execute(self, sql, *parameters)
        except AnalysisException as exc:
            raise DbtRuntimeError(str(exc)) from exc
    Cursor.execute = execute
    

    After the override above we run the DBT from the code and it is successful. The AnalysisException wrap is related to a different bug that is listed in dbt-spark repo.