My application is designed to receive streaming data and insert it into a database using PySpark and Iceberg. After a short period of streaming, I ran
spark.sql("CALL local.system.rewrite_data_files('local.db.table')")
Then, I ran this:
timestamp_ms = int(datetime.datetime.now().timestamp() * 1000)
spark.sql(f"CALL local.system.expire_snapshots('local.db.table', {timestamp_ms})")
The first time, it worked fine, but on the second attempt, when I ran
rewrite_data_files
again, I received the following warning:
24/11/04 15:07:12 WARN Tasks: Retrying task after failure: Version 384 already exists: warehouse/db/table/metadata/v384.metadata.json
org.apache.iceberg.exceptions.CommitFailedException: Version 384 already exists: warehouse/db/table/metadata/v384.metadata.json
at org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:369)
at org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:162)
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:408)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:382)
at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:233)
at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:84)
at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:301)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:399)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:225)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:150)
at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1583)
After a bit of searching, I found an answer. The main point is that this is just a warning indicating that some concurrent updates are happening. The commit will be retried and eventually succeed.