I was trying to write a parquet file with delta encoding. This page, states that parquet supports three types of delta encoding:
(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
Since spark
, pyspark
or pyarrow
do not allow us to specify the encoding method, I was curious how one can write a file with delta encoding enabled?
However, I found on the internet that if I have columns with TimeStamp
type parquet will use delta encoding.
So I used the following code in scala
to create a parquet file. But encoding is not a delta.
val df = Seq(("2018-05-01"),
("2018-05-02"),
("2018-05-03"),
("2018-05-04"),
("2018-05-05"),
("2018-05-06"),
("2018-05-07"),
("2018-05-08"),
("2018-05-09"),
("2018-05-10")
).toDF("Id")
val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
val df3 = df2.withColumn("Date", (col("Id").cast("date")))
df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
parquet-tools
shows the following information regarding the written parquet file.
file schema: spark_schema
--------------------------------------------------------------------------------
Id: OPTIONAL BINARY L:STRING R:0 D:1
Timestamp: OPTIONAL INT96 R:0 D:1
Date: OPTIONAL INT32 L:DATE R:0 D:1
row group 1: RC:31 TS:1100 OFFSET:4
--------------------------------------------------------------------------------
Id: BINARY SNAPPY DO:0 FPO:4 SZ:230/487/2.12 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
Timestamp: INT96 SNAPPY DO:0 FPO:234 SZ:212/436/2.06 VC:31 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[num_nulls: 0, min/max not defined]
Date: INT32 SNAPPY DO:0 FPO:446 SZ:181/177/0.98 VC:31 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 2018-05-01, max: 2018-05-31, num_nulls: 0]
As you can see, no column has used delta encoding.
My questions are:
How can I write a parquet file with delta encoding? (If you can
provide an example code in scala
or python
that would be great.)
How to decide which "delta encoding": (DELTA_BINARY_PACKED,
DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY)
to use?
It was really challenging to figure out how to enable DELTA encoding when generating parquet files using PySpark.
We generate lots of numeric data which can really benefit using DELTA encoding. In my test, I was able to reduce a small test file of 136.9MB to 101.6MB using DELTA encoding. For our use case we generate TBs of data so the S3 savings going forward are worth considering it.
My experience is with Spark 2.4.5 using EMR 5.29.0. There are lots of gotchas that I encountered both before and after generating DELTA encoded files. I will mention them so that you are aware of the issues and don't trip yourselves.
In order to generate the DELTA encoded parquet file in PySpark, we need to enable version 2 of the Parquet write. This is the only way it works. Also, for some reason the setting only works when creating the spark context. The setting is:
"spark.hadoop.parquet.writer.version": "v2"
and the result is:
time: INT64 GZIP DO:0 FPO:11688 SZ:84010/2858560/34.03 VC:15043098 ENC:DELTA_BINARY_PACKED ST:[min: 1577715561210, max: 1577839907009, num_nulls: 0]
HOWEVER, one cannot read the same file back in PySpark as is as you will get
java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED
In order to read the file back, one needs to disable the following conf:
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
Also, a sad but bonus tip, Pandas will not read these files as of writing this post.
Parquet files v2.0 created by spark can't be read by pyarrow