I have AWS Glue job. It is using "Glue 4.0 - Spark 3.3, Scala 2, Python 3" version.
It reads individual parquet files and write to Delta Lake. I am using "write.parquet.compression-codec": "snappy"
which is from the code generated by AWS Glue Visual ETL.
Now I am hoping to change to use Zstandard "write.parquet.compression-codec": "zstd"
instead.
Here is my new code:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={
"paths": [
"s3://my-bucket/data/raw-parquet/motor/"
],
"recurse": True,
},
transformation_ctx="S3bucket_node1",
)
additional_options = {
"path": "s3://my-bucket/data/new-zstd-delta-tables/motor/", # <- An empty new folder
"write.parquet.compression-codec": "zstd", # <- Changed to zstd
"mergeSchema": "true",
}
sink_to_delta_lake_node3_df = S3bucket_node1.toDF()
sink_to_delta_lake_node3_df.write.format("delta").options(**additional_options).mode(
"overwrite"
).save()
job.commit()
However, the final parquet files generated in Delta Lake still looks like part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet
. Note snappy
in the name.
I also verified the file content is actually using "snappy" compression codec by
import pyarrow.parquet as pq
parquet_file_path = "part-00107-85b2560f-81a4-4d05-8d09-5df143685dbf.c000.snappy.parquet"
print(pq.ParquetFile(parquet_file_path).metadata.row_group(0).column(0).compression)
# It prints "SNAPPY"
How to write to Delta Lake using "zstd" compression codec in AWS Glue correctly? Thanks!
You need to use compression
option instead (doc). For example:
spark.range(100).write.format("delta")\
.option("compression", "zstd").mode("overwrite").save("zstd.delta")
will give you the desired output (tested on Databricks & local Spark):
# ls zstd.delta
_delta_log/
part-00000-b7078370-6b8f-4632-8071-8ee2dbc61194-c000.zstd.parquet
part-00001-3201e316-1cc7-4f78-9945-f7b4b21922b0-c000.zstd.parquet
part-00002-d25fb0ed-bf7d-468f-bd98-df480c2acabd-c000.zstd.parquet
part-00003-b9e60fb8-2d7a-4e4f-981c-a4c9314c9b41-c000.zstd.parquet
part-00004-224f9282-930b-4a89-ba08-2c4b752781dd-c000.zstd.parquet
part-00005-4671308a-3e6a-4cba-83ad-bb6e7f404d68-c000.zstd.parquet
part-00006-2b08bbf9-7ece-4ccd-828b-88713fe226f9-c000.zstd.parquet
part-00007-53fc0ebb-29b1-496c-bcce-f2994ec04226-c000.zstd.parquet
part-00008-f0cf609d-d1c4-4805-8586-dd6e9481cc9b-c000.zstd.parquet
part-00009-a918c5c8-77ea-4f6d-b559-d36740e5a3bb-c000.zstd.parquet
part-00010-4a47e50d-dfdc-4f7f-826a-7e273a2fd404-c000.zstd.parquet
part-00011-95625e5a-7130-4f45-bf6c-bc721fe7561b-c000.zstd.parquet
Alternatively, you can set it globally:
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")