Spark Notebook, if run manually from the notebook (whether under my credentials or running Managed Identity) runs successfully in about 3 minutes
If placed in a Pipeline - prior to Tuesday, it ran fine.
Since Tuesday, it fails when in a Pipeline.
It's using the same Spark Pool as when manually run successfully.
There were no updates to the notebook's code prior to the sudden failure
The committed version of the notebook is the same as the one that runs fine successfully.
It seems to time out after 2 hours on the last step (INSERT).
Error Message:
{
"errorCode": "6002",
"message": "Error: Job aborted.\norg.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)\norg.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:284)\norg.apache.spark.sql.delta.files.TransactionalWrite.$anonfun$writeFiles$1(TransactionalWrite.scala:456)\norg.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)\norg.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)\norg.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)\norg.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)\norg.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)\norg.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:391)\norg.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:355)\norg.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103)\norg.apache.spark.sql.delta.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:221)\norg.apache.spark.sql.delta.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:218)\norg.apache.spark.sql.delta.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:103)\norg.apache.spark.sql.delta.commands.WriteIntoDelta.write(WriteIntoDelta.scala:337)\norg.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:98)\norg.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:91)\norg.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:252)\norg.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:91)\norg.apache.spark.sql.delta.catalog.WriteIntoDeltaBuilder$$anon$1$$anon$2.insert(DeltaTableV2.scala:279)\norg.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1(V1FallbackWriters.scala:79)\norg.apache.spark.sql.execution.datasources.v2.SupportsV1Write.writeWithV1$(V1FallbackWriters.scala:78)\norg.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.writeWithV1(V1FallbackWriters.scala:34)\norg.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run(V1FallbackWriters.scala:66)\norg.apache.spark.sql.execution.datasources.v2.V1FallbackWriters.run$(V1FallbackWriters.scala:65)\norg.apache.spark.sql.execution.datasources.v2.AppendDataExecV1.run(V1FallbackWriters.scala:34)\norg.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)\norg.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)\norg.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)\norg.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)\norg.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)\norg.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)\norg.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)\norg.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)\norg.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)\norg.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:113)\norg.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)\norg.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)\norg.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)\norg.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)\norg.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)\norg.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)\norg.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)\norg.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)\norg.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)\norg.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)\norg.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)\norg.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:90)\norg.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:84)\norg.apache.spark.sql.Dataset.<init>(Dataset.scala:231)\norg.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)\norg.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)\norg.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)\norg.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)\norg.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)\norg.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)\norg.apache.livy.repl.SQLInterpreter.execute(SQLInterpreter.scala:129)\norg.apache.livy.repl.Session.$anonfun$executeCode$1(Session.scala:716)\nscala.Option.map(Option.scala:230)\norg.apache.livy.repl.Session.executeCode(Session.scala:713)\norg.apache.livy.repl.Session.$anonfun$execute$4(Session.scala:492)\norg.apache.livy.repl.Session.withRealtimeOutputSupport(Session.scala:936)\norg.apache.livy.repl.Session.$anonfun$execute$1(Session.scala:492)\nscala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\nscala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)\nscala.util.Success.$anonfun$map$1(Try.scala:255)\nscala.util.Success.map(Try.scala:213)\nscala.concurrent.Future.$anonfun$map$1(Future.scala:292)\nscala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)\nscala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)\nscala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)\njava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\njava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\njava.lang.Thread.run(Thread.java:750)",
"failureType": "UserError",
"target": "Rebuild_factGL",
"details": []
I've tried:
How can I troubleshoot better?
Solved-ish.
TLDR: Use PARSER_VERSION = '2.0' when doing OPENROWSET on CSV files.
Approximately one week after the Spark Notebook stopped working when run in a Pipeline, it also stopped working when ran manually. I'm not sure why the delayed reaction there, but now it was being consistent.
I isolated it back to a SQL Serverless View the Spark Notebook was calling.
That view had suddenly gone from running in 1.5 minutes, to timing out.
In particluar, the View would time out if 3 conditions were met - calling both numeric columns, and including a where clause on date. If 2 conditions were met - it ran in 1.5 minutes. All 3 = a time out.
This was reproducable behavior for 3 weeks.
I never was able to troubleshoot what exactly was occuring. But, all the underlying tables of the View, ultimately, were coming from CSV viles in the datalake. I changed all of those to PARSER_VERSION = '2.0', and now it runs, and faster than it did prior to the hiccup.