Search code examples
pysparkaws-glueapache-iceberg

pySpark with iceberg saving multiple small files


I have a pyspark job running on Glue. My job processes the data and saves it as Apache Iceberg. The problem is, the save table generates multiple small files within partitions. I tested several ways of saving data, but none resolved. Here follows my snipped code.

import pyspark.sql.functions as f
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import DataFrame

conf = (
    SparkConf()
    .setAppName(APP_NAME)
    .set(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.glue_catalog.warehouse", BRONZE_PATH)
    .set(
        "spark.sql.catalog.glue_catalog.catalog-impl",
        "org.apache.iceberg.aws.glue.GlueCatalog",
    )
    .set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.shuffle.partitions", "100")
)
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
glue_db = glueContext.create_dynamic_frame.from_catalog(database=DATABASE_NAME, table_name=LANDING_TABLE_NAME)
df = glue_db.toDF()
df.createOrReplaceTempView(APP_NAME)

# do all processing here...

df = df.sortWithinPartitions("issueid")
(
    df.writeTo(f"glue_catalog.bronze_{ENVIRONMENT}.{BRONZE_TABLE_NAME}")
    .using("iceberg")
    .tableProperty("format-version", "2")
    .tableProperty("location", BRONZE_PATH + BRONZE_TABLE_OUTPUT)
    .tableProperty("write.distribution.mode", "hash")
    .tableProperty("write.target-file-size-bytes", "536870912")
    .partitionedBy("issueid")
    .createOrReplace()
)

My output is shown below:

enter image description here

The desired output I want: a single compacted file per partition.

How can I achieve this?


Solution

  • Doing repartition worked for me:

    df = df.repartition("issueid").sortWithinPartitions("issueid")
    

    Not sure if it is the best solution. Please feel free to improve it.