Search code examples
azureazure-cosmosdbdatabricksazure-databricks

Py4JJavaError: An error occurred while calling o3858.save. : org.apache.spark.SparkException: Writing job aborted


I m trying to write data into cosmos db using the dataframe df_u. i have defined the configureation in writeMcgMd. i m using spark version 3.2.1

Code -

  df_u.write.format("cosmos.oltp").options(**writeMcgMd).mode("append").save()

Used -

writeMcgMd = {
  "spark.cosmos.accountEndpoint" : "https://cccc.azure.com:443/",
  "spark.cosmos.accountKey" : "ccc",
  "spark.cosmos.database" : "cccc",
  "spark.cosmos.container" : "ccc",
#   "spark.cosmos.write.strategy": "ItemOverwrite"
}

Error from driver logs from databricks cluster [ Standard error] -

wnWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:565) at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:956) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:346) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 17.0 failed 4 times, most recent failure: Lost task 2.3 in stage 17.0 (TID 200) (10.240.26.5 executor 0): java.lang.IllegalArgumentException: requirement failed: id is a mandatory field. But it is missing or it is not a string. Json: {"_attachments":"attachments/","databasename":"mwhcicm","lastloadtime":"01-01-1900","parentname":"Mary","query":"SELECT x, y, z, a, \tLEFT(Text, 4000) as Text, Active,\tUpdateDate,\tInsertDate,\tRefNoteSubjectID,'x' as sourcedb,getdate() as processdate FROM o ","recordtype":"metadata","schema":"dbo","tableName":"o","where_col1":" WHERE UpdateDate > ","where_col2":" OR InsertDate > "} at scala.Predef$.require(Predef.scala:281) at com.azure.cosmos.spark.ItemsDataWriteFactory$CosmosWriter.write(ItemsDataWriteFactory.scala:106) at com.azure.cosmos.spark.ItemsDataWriteFactory$CosmosWriter.write(ItemsDataWriteFactory.scala:71) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:436) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1689) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:474) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:375) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:95) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:825) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1655) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:828) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.IllegalStateException: The Spark task was aborted, Context: SparkTaskContext(correlationActivityId=c27b3c5a-5038-4ce5-b1fa-9bb238917860,stageId=17,partitionId=2,taskAttemptId=200,details=) at com.azure.cosmos.spark.BulkWriter.abort(BulkWriter.scala:625) at com.azure.cosmos.spark.ItemsDataWriteFactory$CosmosWriter.abort(ItemsDataWriteFactory.scala:129) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:470) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1700) ... 20 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2984) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2931) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2925) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2925) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1345) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1345) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1345) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3193) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3134) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3122) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1107) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2637) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2620) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:371) ... 46 more Caused by: java.lang.IllegalArgumentException: requirement failed: id is a mandatory field. But it is missing or it is not a string. Json:


Solution

  • java.lang.IllegalArgumentException: requirement failed: id is a mandatory field. But it is missing, or it is not a string.

    This error indicates that the values you are passing to the JSON document being delivered to the service do not have the necessary ID field.

    Because of this necessary attribute "id;" is absent, the input is showing invalid.

    To overcome this, specify an id property with a string value as part of your document.

    With this approach we reproduced the same scenario in our environment, and it worked.

    Make sure you install CosmosDB-Spark connector library.

    Below is the sample code,

    enter image description here

    Output: enter image description here

    Reference of Missing the ID property.