Search code examples
amazon-web-servicesapache-sparkpysparkaws-gluedelta-lake

How to write to Delta Lake using "zstd" compression codec in AWS Glue?


I have AWS Glue job. It is using "Glue 4.0 - Spark 3.3, Scala 2, Python 3" version.

It reads individual parquet files and write to Delta Lake. I am using "write.parquet.compression-codec": "snappy" which is from the code generated by AWS Glue Visual ETL.

Now I am hoping to change to use Zstandard "write.parquet.compression-codec": "zstd" instead.

Here is my new code:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://my-bucket/data/raw-parquet/motor/"
        ],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

additional_options = {
    "path": "s3://my-bucket/data/new-zstd-delta-tables/motor/", # <- An empty new folder
    "write.parquet.compression-codec": "zstd", # <- Changed to zstd
    "mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
    "overwrite"
).save()

job.commit()

However, the final parquet files generated in Delta Lake still looks like part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet. Note snappy in the name.

I also verified the file content is actually using "snappy" compression codec by

import pyarrow.parquet as pq

parquet_file_path = "part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet"
print(pq.ParquetFile(parquet_file_path).metadata.row_group(0).column(0).compression)
# It prints "SNAPPY"

How to write to Delta Lake using "zstd" compression codec in AWS Glue correctly? Thanks!


Solution

  • You need to use compression option instead (doc). For example:

    spark.range(100).write.format("delta")\
      .option("compression", "zstd").mode("overwrite").save("zstd.delta")
    

    will give you the desired output (tested on Databricks & local Spark):

    # ls zstd.delta
    _delta_log/
    part-00000-b7078370-6b8f-4632-8071-8ee2dbc61194-c000.zstd.parquet
    part-00001-3201e316-1cc7-4f78-9945-f7b4b21922b0-c000.zstd.parquet
    part-00002-d25fb0ed-bf7d-468f-bd98-df480c2acabd-c000.zstd.parquet
    part-00003-b9e60fb8-2d7a-4e4f-981c-a4c9314c9b41-c000.zstd.parquet
    part-00004-224f9282-930b-4a89-ba08-2c4b752781dd-c000.zstd.parquet
    part-00005-4671308a-3e6a-4cba-83ad-bb6e7f404d68-c000.zstd.parquet
    part-00006-2b08bbf9-7ece-4ccd-828b-88713fe226f9-c000.zstd.parquet
    part-00007-53fc0ebb-29b1-496c-bcce-f2994ec04226-c000.zstd.parquet
    part-00008-f0cf609d-d1c4-4805-8586-dd6e9481cc9b-c000.zstd.parquet
    part-00009-a918c5c8-77ea-4f6d-b559-d36740e5a3bb-c000.zstd.parquet
    part-00010-4a47e50d-dfdc-4f7f-826a-7e273a2fd404-c000.zstd.parquet
    part-00011-95625e5a-7130-4f45-bf6c-bc721fe7561b-c000.zstd.parquet
    

    Alternatively, you can set it globally:

    spark.conf.set("spark.sql.parquet.compression.codec", "zstd")