I have a pyspark job running on Glue. My job processes the data and saves it as Apache Iceberg. The problem is, the save table generates multiple small files within partitions. I tested several ways of saving data, but none resolved. Here follows my snipped code.
import pyspark.sql.functions as f
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import DataFrame
conf = (
SparkConf()
.setAppName(APP_NAME)
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.glue_catalog.warehouse", BRONZE_PATH)
.set(
"spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog",
)
.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.shuffle.partitions", "100")
)
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
glue_db = glueContext.create_dynamic_frame.from_catalog(database=DATABASE_NAME, table_name=LANDING_TABLE_NAME)
df = glue_db.toDF()
df.createOrReplaceTempView(APP_NAME)
# do all processing here...
df = df.sortWithinPartitions("issueid")
(
df.writeTo(f"glue_catalog.bronze_{ENVIRONMENT}.{BRONZE_TABLE_NAME}")
.using("iceberg")
.tableProperty("format-version", "2")
.tableProperty("location", BRONZE_PATH + BRONZE_TABLE_OUTPUT)
.tableProperty("write.distribution.mode", "hash")
.tableProperty("write.target-file-size-bytes", "536870912")
.partitionedBy("issueid")
.createOrReplace()
)
My output is shown below:
The desired output I want: a single compacted file per partition.
How can I achieve this?
Doing repartition worked for me:
df = df.repartition("issueid").sortWithinPartitions("issueid")
Not sure if it is the best solution. Please feel free to improve it.