Search code examples
apache-sparkamazon-s3aws-glueapache-iceberg

Unable to save partitioned data in in iceberg format when using s3 and glue


Getting the following error-

 java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'year=2022/month=10/day=8/hour=12' in spec [
  1000: year: identity(24)
  1001: month: identity(25)
  1002: day: identity(26)
  1003: hour: identity(27)
]
        at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:96)
        at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:31)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:758)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:728)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

This is the query i am running on spark 3.3, with glue catalog and saving to s3. The iceberg version is 1.1.0 -

USING iceberg
PARTITIONED BY (year, month, day, hour)
AS SELECT * from data

But when I try to save the data without partitioning, it works without any problems -

CREATE TABLE my_catalog.test.iceberg_test
USING iceberg
PARTITIONED BY (year, month, day, hour)
AS SELECT * from data 

How do I fix this?


Solution

  • According to the docs, the data needs to be sorted before saving it -

    Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write against partitioned table.This applies both Writing with SQL and Writing with DataFrames.

    So this is how I fixed the issue -

    df = spark.read.orc("s3a://...")
    df = df.sortWithinPartitions("year", "month", "day", "hour")
    df.createOrReplaceTempView("data")
    

    and then ran the partitioned sql query without any problem.